You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:56 UTC
[37/50] [abbrv] git commit: ACCUMULO-378 Try to get some test
stabilization
ACCUMULO-378 Try to get some test stabilization
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8df4f41b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8df4f41b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8df4f41b
Branch: refs/heads/ACCUMULO-378
Commit: 8df4f41b5d725da7b5ef3c0de8f7219b98085be0
Parents: d3cc1fe
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 17:47:50 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 17:47:50 2014 -0400
----------------------------------------------------------------------
.../client/impl/ReplicationOperationsImpl.java | 18 +++++----
.../master/replication/FinishedWorkUpdater.java | 6 ++-
.../RemoveCompleteReplicationRecords.java | 11 +++++-
.../org/apache/accumulo/tserver/Tablet.java | 2 +-
.../replication/ReplicationSequentialIT.java | 41 +++++++++++---------
5 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 752952d..8ee09cb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -193,21 +193,23 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
entry.getKey().getRow(rowHolder);
entry.getKey().getColumnFamily(colfHolder);
- String row;
+ String file;
if (colfHolder.equals(ReplicationSection.COLF)) {
- row = rowHolder.toString();
- row = row.substring(ReplicationSection.getRowPrefix().length());
+ file = rowHolder.toString();
+ file = file.substring(ReplicationSection.getRowPrefix().length());
} else if (colfHolder.equals(OrderSection.NAME)) {
- row = OrderSection.getFile(entry.getKey(), rowHolder);
+ file = OrderSection.getFile(entry.getKey(), rowHolder);
+ long timeClosed = OrderSection.getTimeClosed(entry.getKey(), rowHolder);
+ log.debug("Order section: {} and {}", timeClosed, file);
} else {
- row = rowHolder.toString();
+ file = rowHolder.toString();
}
- log.debug("Processing {}", row);
+ log.debug("Evaluating if {} is still needed", file);
// Skip files that we didn't observe when we started (new files/data)
- if (!relevantLogs.contains(row)) {
- log.debug("Found file that we didn't care about {}", row);
+ if (!relevantLogs.contains(file)) {
+ log.debug("Found file that we didn't care about {}", file);
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 68bab64..5e0d726 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -94,7 +94,7 @@ public class FinishedWorkUpdater implements Runnable {
continue;
}
- log.debug("Processing work progress from {}", serializedRow.getKey().getRow());
+ log.debug("Processing work progress for {}", serializedRow.getKey().getRow());
Map<String,Long> tableIdToProgress = new HashMap<>();
boolean error = false;
@@ -124,6 +124,10 @@ public class FinishedWorkUpdater implements Runnable {
tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
}
+ for (Entry<String,Long> progressByTable : tableIdToProgress.entrySet()) {
+ log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), progressByTable.getKey(), progressByTable.getValue());
+ }
+
if (error) {
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index e661174..35ce374 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -33,7 +33,6 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
@@ -145,6 +144,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
Mutation m = new Mutation(row);
Status status = null;
+ long closedTime = -1l;
for (Entry<Key,Value> entry : columns.entrySet()) {
try {
status = Status.parseFrom(entry.getValue().get());
@@ -158,6 +158,14 @@ public class RemoveCompleteReplicationRecords implements Runnable {
return 0l;
}
+ if (status.hasClosedTime()) {
+ if (closedTime == -1) {
+ closedTime = status.getClosedTime();
+ } else if (closedTime != status.getClosedTime()) {
+ log.warn("Inconsistent closed time for {}, values seen: {} and {}", row, closedTime, status.getClosedTime());
+ }
+ }
+
Key k = entry.getKey();
k.getColumnFamily(colf);
k.getColumnQualifier(colq);
@@ -172,6 +180,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
ReplicationTarget target = ReplicationTarget.from(colq);
Mutation orderMutation = OrderSection.createMutation(row.toString(), status.getClosedTime());
+ log.info("Deleting {} from order section with tableID {}", new Key(new Text(orderMutation.getRow())).toStringNoTruncate(), target.getSourceTableId());
orderMutation.putDelete(OrderSection.NAME, new Text(target.getSourceTableId()));
// Send the mutation deleting all the columns at once.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 866450c..efcd6c2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1407,7 +1407,7 @@ public class Tablet {
if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
Status status = StatusUtil.fileClosed(System.currentTimeMillis());
for (LogEntry logEntry : logEntries) {
- log.debug("Writing closed status to replication table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
+ log.debug("Writing closed status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8df4f41b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index dce4e17..f1d25ae 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.test.replication;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -160,6 +161,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
}
}
+ final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
}
@@ -189,7 +192,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
@Override
public Boolean call() throws Exception {
- connMaster.replicationOperations().drain(masterTable);
+ connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
log.info("Drain completed");
return true;
}
@@ -200,7 +203,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
- Assert.fail("Drain did not finish within 5 seconds");
+ Assert.fail("Drain did not finish within 30 seconds");
}
log.info("drain completed");
@@ -297,11 +300,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
// Write some data to table1
BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+ long masterTable1Records = 0l;
for (int rows = 0; rows < 2500; rows++) {
Mutation m = new Mutation(masterTable1 + rows);
for (int cols = 0; cols < 100; cols++) {
String value = Integer.toString(cols);
m.put(value, "", value);
+ masterTable1Records++;
}
bw.addMutation(m);
}
@@ -310,11 +315,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
// Write some data to table2
bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+ long masterTable2Records = 0l;
for (int rows = 0; rows < 2500; rows++) {
Mutation m = new Mutation(masterTable2 + rows);
for (int cols = 0; cols < 100; cols++) {
String value = Integer.toString(cols);
m.put(value, "", value);
+ masterTable2Records++;
}
bw.addMutation(m);
}
@@ -327,6 +334,9 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
Thread.sleep(500);
}
+ Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+ masterTable2);
+
// Restart the tserver to force a close on the WAL
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -335,22 +345,15 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
log.info("Restarted the tserver");
- // Wait until we fully replicated something
- boolean fullyReplicated = false;
- for (int i = 0; i < 10 && !fullyReplicated; i++) {
- UtilWaitThread.sleep(2000);
-
- Scanner s = ReplicationTable.getScanner(connMaster);
- WorkSection.limit(s);
- for (Entry<Key,Value> entry : s) {
- Status status = Status.parseFrom(entry.getValue().get());
- if (StatusUtil.isFullyReplicated(status)) {
- fullyReplicated |= true;
- }
- }
- }
+ // Read the data -- the tserver is back up and running
+ for (@SuppressWarnings("unused") Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+
+ // Wait for both tables to be replicated
+ log.info("Waiting for {} for {}", filesFor1, masterTable1);
+ connMaster.replicationOperations().drain(masterTable1, filesFor1);
- Assert.assertNotEquals(0, fullyReplicated);
+ log.info("Waiting for {} for {}", filesFor2, masterTable2);
+ connMaster.replicationOperations().drain(masterTable2, filesFor2);
long countTable = 0l;
for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
@@ -360,7 +363,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
}
log.info("Found {} records in {}", countTable, peerTable1);
- Assert.assertTrue(countTable > 0);
+ Assert.assertEquals(masterTable1Records, countTable);
countTable = 0l;
for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
@@ -370,7 +373,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
}
log.info("Found {} records in {}", countTable, peerTable2);
- Assert.assertTrue(countTable > 0);
+ Assert.assertEquals(masterTable2Records, countTable);
} finally {
peer1Cluster.stop();