You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:56 UTC

[37/50] [abbrv] git commit: ACCUMULO-378 Try to get some test stabilization

ACCUMULO-378 Try to get some test stabilization


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8df4f41b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8df4f41b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8df4f41b

Branch: refs/heads/ACCUMULO-378
Commit: 8df4f41b5d725da7b5ef3c0de8f7219b98085be0
Parents: d3cc1fe
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 17:47:50 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 17:47:50 2014 -0400

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  | 18 +++++----
 .../master/replication/FinishedWorkUpdater.java |  6 ++-
 .../RemoveCompleteReplicationRecords.java       | 11 +++++-
 .../org/apache/accumulo/tserver/Tablet.java     |  2 +-
 .../replication/ReplicationSequentialIT.java    | 41 +++++++++++---------
 5 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 752952d..8ee09cb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -193,21 +193,23 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
         entry.getKey().getRow(rowHolder);
         entry.getKey().getColumnFamily(colfHolder);
 
-        String row;
+        String file;
         if (colfHolder.equals(ReplicationSection.COLF)) {
-          row = rowHolder.toString();
-          row = row.substring(ReplicationSection.getRowPrefix().length());
+          file = rowHolder.toString();
+          file = file.substring(ReplicationSection.getRowPrefix().length());
         } else if (colfHolder.equals(OrderSection.NAME)) {
-          row = OrderSection.getFile(entry.getKey(), rowHolder);
+          file = OrderSection.getFile(entry.getKey(), rowHolder);
+          long timeClosed = OrderSection.getTimeClosed(entry.getKey(), rowHolder);
+          log.debug("Order section: {} and {}", timeClosed, file);
         } else {
-          row = rowHolder.toString();
+          file = rowHolder.toString();
         }
 
-        log.debug("Processing {}", row);
+        log.debug("Evaluating if {} is still needed", file);
 
         // Skip files that we didn't observe when we started (new files/data)
-        if (!relevantLogs.contains(row)) {
-          log.debug("Found file that we didn't care about {}", row);
+        if (!relevantLogs.contains(file)) {
+          log.debug("Found file that we didn't care about {}", file);
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 68bab64..5e0d726 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -94,7 +94,7 @@ public class FinishedWorkUpdater implements Runnable {
           continue;
         }
 
-        log.debug("Processing work progress from {}", serializedRow.getKey().getRow());
+        log.debug("Processing work progress for {}", serializedRow.getKey().getRow());
 
         Map<String,Long> tableIdToProgress = new HashMap<>();
         boolean error = false;
@@ -124,6 +124,10 @@ public class FinishedWorkUpdater implements Runnable {
           tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
         }
 
+        for (Entry<String,Long> progressByTable : tableIdToProgress.entrySet()) {
+          log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), progressByTable.getKey(), progressByTable.getValue());
+        }
+
         if (error) {
           continue;
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index e661174..35ce374 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -33,7 +33,6 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
@@ -145,6 +144,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
 
     Mutation m = new Mutation(row);
     Status status = null;
+    long closedTime = -1l;
     for (Entry<Key,Value> entry : columns.entrySet()) {
       try {
         status = Status.parseFrom(entry.getValue().get());
@@ -158,6 +158,14 @@ public class RemoveCompleteReplicationRecords implements Runnable {
         return 0l;
       }
 
+      if (status.hasClosedTime()) {
+        if (closedTime == -1) {
+          closedTime = status.getClosedTime();
+        } else if (closedTime != status.getClosedTime()) {
+          log.warn("Inconsistent closed time for {}, values seen: {} and {}", row, closedTime, status.getClosedTime());
+        }
+      }
+
       Key k = entry.getKey();
       k.getColumnFamily(colf);
       k.getColumnQualifier(colq);
@@ -172,6 +180,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
     ReplicationTarget target = ReplicationTarget.from(colq);
 
     Mutation orderMutation = OrderSection.createMutation(row.toString(), status.getClosedTime());
+    log.info("Deleting {} from order section with tableID {}", new Key(new Text(orderMutation.getRow())).toStringNoTruncate(), target.getSourceTableId());
     orderMutation.putDelete(OrderSection.NAME, new Text(target.getSourceTableId()));
 
     // Send the mutation deleting all the columns at once.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 866450c..efcd6c2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1407,7 +1407,7 @@ public class Tablet {
         if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
           Status status = StatusUtil.fileClosed(System.currentTimeMillis());
           for (LogEntry logEntry : logEntries) {
-            log.debug("Writing closed status to replication table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
+            log.debug("Writing closed status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
             ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
           }
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index dce4e17..f1d25ae 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.test.replication;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -160,6 +161,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       }
     }
 
+    final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+
     for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
       cluster.killProcess(ServerType.TABLET_SERVER, proc);
     }
@@ -189,7 +192,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       @Override
       public Boolean call() throws Exception {
-        connMaster.replicationOperations().drain(masterTable);
+        connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
         log.info("Drain completed");
         return true;
       }
@@ -200,7 +203,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       future.get(30, TimeUnit.SECONDS);
     } catch (TimeoutException e) {
       future.cancel(true);
-      Assert.fail("Drain did not finish within 5 seconds");
+      Assert.fail("Drain did not finish within 30 seconds");
     }
 
     log.info("drain completed");
@@ -297,11 +300,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       // Write some data to table1
       BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+      long masterTable1Records = 0l; 
       for (int rows = 0; rows < 2500; rows++) {
         Mutation m = new Mutation(masterTable1 + rows);
         for (int cols = 0; cols < 100; cols++) {
           String value = Integer.toString(cols);
           m.put(value, "", value);
+          masterTable1Records++;
         }
         bw.addMutation(m);
       }
@@ -310,11 +315,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       // Write some data to table2
       bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+      long masterTable2Records = 0l;
       for (int rows = 0; rows < 2500; rows++) {
         Mutation m = new Mutation(masterTable2 + rows);
         for (int cols = 0; cols < 100; cols++) {
           String value = Integer.toString(cols);
           m.put(value, "", value);
+          masterTable2Records++;
         }
         bw.addMutation(m);
       }
@@ -327,6 +334,9 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
         Thread.sleep(500);
       }
 
+      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+          masterTable2);
+
       // Restart the tserver to force a close on the WAL
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -335,22 +345,15 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
 
       log.info("Restarted the tserver");
 
-      // Wait until we fully replicated something
-      boolean fullyReplicated = false;
-      for (int i = 0; i < 10 && !fullyReplicated; i++) {
-        UtilWaitThread.sleep(2000);
-
-        Scanner s = ReplicationTable.getScanner(connMaster);
-        WorkSection.limit(s);
-        for (Entry<Key,Value> entry : s) {
-          Status status = Status.parseFrom(entry.getValue().get());
-          if (StatusUtil.isFullyReplicated(status)) {
-            fullyReplicated |= true;
-          }
-        }
-      }
+      // Read the data -- the tserver is back up and running
+      for (@SuppressWarnings("unused") Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+
+      // Wait for both tables to be replicated
+      log.info("Waiting for {} for {}", filesFor1, masterTable1);
+      connMaster.replicationOperations().drain(masterTable1, filesFor1);
 
-      Assert.assertNotEquals(0, fullyReplicated);
+      log.info("Waiting for {} for {}", filesFor2, masterTable2);
+      connMaster.replicationOperations().drain(masterTable2, filesFor2);
 
       long countTable = 0l;
       for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
@@ -360,7 +363,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       }
 
       log.info("Found {} records in {}", countTable, peerTable1);
-      Assert.assertTrue(countTable > 0);
+      Assert.assertEquals(masterTable1Records, countTable);
 
       countTable = 0l;
       for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
@@ -370,7 +373,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       }
 
       log.info("Found {} records in {}", countTable, peerTable2);
-      Assert.assertTrue(countTable > 0);
+      Assert.assertEquals(masterTable2Records, countTable);
 
     } finally {
       peer1Cluster.stop();