You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:54 UTC
[35/50] [abbrv] git commit: ACCUMULO-2819 More test updates for the
sequential work assigner
ACCUMULO-2819 More test updates for the sequential work assigner
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9f779184
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9f779184
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9f779184
Branch: refs/heads/ACCUMULO-378
Commit: 9f779184cbfa8e3c18be137f8e0471bc0bae4491
Parents: 26a88b4
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 15:40:49 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 15:40:49 2014 -0400
----------------------------------------------------------------------
.../DistributedWorkQueueWorkAssigner.java | 6 +-
.../master/replication/ReplicationDriver.java | 1 -
.../tserver/log/TabletServerLogger.java | 10 +-
.../test/replication/MockReplicaSystem.java | 69 +++++++++++++
.../test/replication/CyclicReplicationIT.java | 27 +++--
.../replication/ReplicationSequentialIT.java | 60 ++++++++---
.../test/replication/ReplicationWithGCIT.java | 102 ++++++-------------
.../replication/ReplicationWithMakerTest.java | 10 +-
test/src/test/resources/log4j.properties | 1 +
9 files changed, 179 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 84f9af5..f04f3e8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -217,8 +217,6 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
return;
}
- log.info("Creating batchscanner to read Work records from the replication table");
-
WorkSection.limit(bs);
bs.setRanges(Collections.singleton(new Range()));
Text buffer = new Text();
@@ -261,12 +259,12 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
}
}
} finally {
+ log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
+
if (null != bs) {
bs.close();
}
}
-
- log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 75fe5f3..3069c97 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -24,7 +24,6 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.Master;
-import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.log4j.Logger;
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 67127f1..d8c4279 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -239,7 +239,7 @@ public class TabletServerLogger {
return write(sessions, mincFinish, writer);
}
- private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
+ private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
// Work very hard not to lock this during calls to the outside world
int currentLogSet = logSetId.get();
@@ -268,7 +268,7 @@ public class TabletServerLogger {
}
// Need to release
- KeyExtent extent = commitSession.getExtent();tserver.getTableConfiguration(extent).getNamespaceConfiguration();
+ KeyExtent extent = commitSession.getExtent();
if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
Set<String> logs = new HashSet<String>();
for (DfsLogger logger : copy) {
@@ -328,6 +328,7 @@ public class TabletServerLogger {
@Override
void withWriteLock() throws IOException {
close();
+ closeForReplication(sessions);
}
});
}
@@ -343,11 +344,16 @@ public class TabletServerLogger {
@Override
void withWriteLock() throws IOException {
close();
+ closeForReplication(sessions);
}
});
return seq;
}
+ protected void closeForReplication(Collection<CommitSession> sessions) {
+ // TODO We can close the WAL here for replication purposes
+ }
+
public int defineTablet(final CommitSession commitSession) throws IOException {
// scribble this into the metadata tablet, too.
if (!enabled(commitSession))
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
new file mode 100644
index 0000000..cafd9b7
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Fake ReplicaSystem which returns that the data was fully replicated after some sleep period (in milliseconds)
+ * <p>
+ * Default sleep amount is 0ms
+ */
+public class MockReplicaSystem implements ReplicaSystem {
+ private static final Logger log = LoggerFactory.getLogger(MockReplicaSystem.class);
+
+ private long sleep = 0;
+
+ @Override
+ public Status replicate(Path p, Status status, ReplicationTarget target) {
+ Status.Builder builder = Status.newBuilder(status);
+ if (status.getInfiniteEnd()) {
+ builder.setBegin(Long.MAX_VALUE);
+ } else {
+ builder.setBegin(status.getEnd());
+ }
+
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ log.error("Interrupted while sleeping, will report no progress", e);
+ Thread.currentThread().interrupt();
+ return status;
+ }
+
+
+ Status newStatus = builder.build();
+ log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+ return newStatus;
+ }
+
+ @Override
+ public void configure(String configuration) {
+ try {
+ sleep = Long.parseLong(configuration);
+ } catch (NumberFormatException e) {
+ log.warn("Could not parse {} as an integer, using default sleep of {}", configuration, sleep, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index 75f6a3f..b30dc39 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -48,6 +49,8 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
@@ -55,6 +58,7 @@ import com.google.common.collect.Iterables;
*
*/
public class CyclicReplicationIT {
+ private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class);
@Rule
public TestName testName = new TestName();
@@ -80,7 +84,7 @@ public class CyclicReplicationIT {
out.close();
}
- @Test
+ @Test(timeout = 5 * 60 * 1000)
public void dataIsNotOverReplicated() throws Exception {
File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2");
String password = "password";
@@ -155,9 +159,12 @@ public class CyclicReplicationIT {
Mutation m = new Mutation("row");
m.put("count", "", "1");
bw.addMutation(m);
- bw.flush();
bw.close();
+ Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Cluster.getInstanceName());
+
+ log.info("Found {} that need replication from master1", files);
+
// Kill and restart the tserver to close the WAL on master1
for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
master1Cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -165,21 +172,26 @@ public class CyclicReplicationIT {
master1Cluster.exec(TabletServer.class);
+ log.info("Restarted tserver on master1");
+
// Sanity check that the element is there on master1
Scanner s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
- Thread.sleep(5000);
-
// Wait for this table to replicate
- connMaster1.replicationOperations().drain(master1Cluster.getInstanceName());
+ connMaster1.replicationOperations().drain(master1Cluster.getInstanceName(), files);
+
+ Thread.sleep(5000);
// Check that the element made it to master2 only once
s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
+ // Wait for master2 to finish replicating it back
+ files = connMaster2.replicationOperations().referencedFiles(master2Cluster.getInstanceName());
+
// Kill and restart the tserver to close the WAL on master2
for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
master2Cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -192,10 +204,9 @@ public class CyclicReplicationIT {
entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
- Thread.sleep(5000);
+ connMaster2.replicationOperations().drain(master2Cluster.getInstanceName(), files);
- // Wait for master2 to finish replicating it back
- connMaster2.replicationOperations().drain(master2Cluster.getInstanceName());
+ Thread.sleep(5000);
// Verify that the entry wasn't sent back to master1
s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index 0683a57..dce4e17 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -44,10 +44,13 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -86,8 +89,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
cfg.setProperty(Property.REPLICATION_NAME, "master");
cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- cfg.useMiniDFS(true);
-// hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Test(timeout = 60 * 5000)
@@ -158,6 +160,31 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
}
}
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("TabletServer restarted");
+ for (@SuppressWarnings("unused") Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
+ log.info("TabletServer is online");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
Future<Boolean> future = executor.submit(new Callable<Boolean>() {
@Override
@@ -169,12 +196,17 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
});
- connMaster.tableOperations().compact(masterTable, null, null, true, true);
+ try {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ Assert.fail("Drain did not finish within 5 seconds");
+ }
+
+ log.info("drain completed");
log.info("");
- log.info("Compaction completed");
-
- log.debug("");
+ log.info("Fetching metadata records:");
for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
@@ -183,13 +215,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
}
}
- try {
- future.get(15, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- Assert.fail("Drain did not finish within 5 seconds");
- }
-
log.info("");
+ log.info("Fetching replication records:");
for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
}
@@ -300,8 +327,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
Thread.sleep(500);
}
- connMaster.tableOperations().compact(masterTable1, null, null, true, false);
- connMaster.tableOperations().compact(masterTable2, null, null, true, false);
+ // Restart the tserver to force a close on the WAL
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("Restarted the tserver");
// Wait until we fully replicated something
boolean fullyReplicated = false;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 101001f..23da719 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -58,7 +57,6 @@ import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
@@ -76,33 +74,6 @@ import com.google.protobuf.TextFormat;
public class ReplicationWithGCIT extends ConfigurableMacIT {
private static final Logger log = LoggerFactory.getLogger(ReplicationWithGCIT.class);
- /**
- * Fake ReplicaSystem which immediately returns that the data was fully replicated
- */
- public static class MockReplicaSystem implements ReplicaSystem {
- private static final Logger log = LoggerFactory.getLogger(MockReplicaSystem.class);
-
- public MockReplicaSystem() {}
-
- @Override
- public Status replicate(Path p, Status status, ReplicationTarget target) {
- Status.Builder builder = Status.newBuilder(status);
- if (status.getInfiniteEnd()) {
- builder.setBegin(Long.MAX_VALUE);
- } else {
- builder.setBegin(status.getEnd());
- }
-
- Status newStatus = builder.build();
- log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
- return newStatus;
- }
-
- @Override
- public void configure(String configuration) {}
-
- }
-
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
@@ -358,10 +329,10 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
}
- @Test
+ @Test(timeout = 5 * 60 * 1000)
public void replicatedStatusEntriesAreDeleted() throws Exception {
- Connector conn = getConnector();
- FileSystem fs = FileSystem.getLocal(new Configuration());
+ final Connector conn = getConnector();
+ log.info("Got connector to MAC");
String table1 = "table1";
// replication shouldn't exist when we begin
@@ -377,8 +348,9 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
// Replicate table1 to cluster1 in the table with id of '4'
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ // Use the MockReplicaSystem impl and sleep for 5seconds
conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
attempts = 0;
} catch (Exception e) {
attempts--;
@@ -469,17 +441,23 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
}
/**
- * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. It may or may not yet be closed.
+ * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
+ * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
+ * anymore
*/
+ log.info("Killing tserver");
// Kill the tserver(s) and restart them
// to ensure that the WALs we previously observed all move to closed.
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
}
+ log.info("Starting tserver");
cluster.exec(TabletServer.class);
+ log.info("Waiting to read tables");
+
// Make sure we can read all the tables (recovery complete)
for (String table : new String[] {MetadataTable.NAME, table1}) {
s = conn.createScanner(table, new Authorizations());
@@ -487,48 +465,39 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
Entry<Key,Value> entry : s) {}
}
- // Need to make sure we get the entries in metadata
- boolean foundResults = false;
- for (int i = 0; i < 5 && !foundResults; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- if (Iterables.size(s) > 0) {
- foundResults = true;
- }
- Thread.sleep(1000);
- }
-
- Assert.assertTrue("Did not find any replication entries in the metadata table", foundResults);
-
+ log.info("Checking for replication entries in replication");
// Then we need to get those records over to the replication table
- foundResults = false;
- for (int i = 0; i < 5 && !foundResults; i++) {
+ boolean foundResults = false;
+ for (int i = 0; i < 5; i++) {
s = ReplicationTable.getScanner(conn);
if (Iterables.size(s) > 0) {
foundResults = true;
+ break;
}
Thread.sleep(1000);
}
Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- for (Entry<Key,Value> entry : s) {
- String row = entry.getKey().getRow().toString();
- Path file = new Path(row.substring(ReplicationSection.getRowPrefix().length()));
- Assert.assertTrue(file + " did not exist when it should", fs.exists(file));
+ // We expect no records in the metadata table after compaction. We have to poll
+ // because we have to wait for the StatusMaker's next iteration which will clean
+ // up the dangling record after we create the record in the replication table
+ foundResults = true;
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ if (Iterables.size(s) == 0) {
+ foundResults = false;
+ break;
+ }
+ Thread.sleep(1000);
}
- /**
- * After recovery completes, we should have unreplicated, closed Status messages. The close happens at the beginning of log recovery.
- * The MockReplicaSystem we configured will just automatically say the data has been replicated, so this should then created replicated
- * and closed Status messages.
- */
+ Assert.assertFalse("Replication status messages were not cleaned up from metadata table, check why the StatusMaker didn't delete them", foundResults);
/**
* After we set the begin to Long.MAX_VALUE, the RemoveCompleteReplicationRecords class will start deleting the records which have been closed by
- * CloseWriteAheadLogReferences (which will have been working since we restarted the tserver(s))
+ * the minor compaction and replicated by the MockReplicaSystem
*/
// Wait for a bit since the GC has to run (should be running after a one second delay)
@@ -552,16 +521,5 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
}
Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
-
- // If the replication table entries were deleted, so should the metadata table replication entries
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- recordsFound = 0;
- for (Entry<Key,Value> entry : s) {
- recordsFound++;
- log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
- }
-
- Assert.assertEquals("Found unexpected replication records in the metadata table", 0, recordsFound);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index aee8a1e..03ac72c 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -27,10 +27,10 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -142,13 +142,13 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
Scanner s = ReplicationTable.getScanner(conn);
StatusSection.limit(s);
Entry<Key,Value> entry = null;
+ Status expectedStatus = StatusUtil.openWithUnknownLength();
attempts = 5;
// This record will move from new to new with infinite length because of the minc (flush)
while (null == entry && attempts > 0) {
try {
entry = Iterables.getOnlyElement(s);
- Status actualStatus = Status.parseFrom(entry.getValue().get());
- if (!actualStatus.hasClosedTime() || !actualStatus.getClosed()) {
+ if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) {
entry = null;
// the master process didn't yet fire and write the new mutation, wait for it to do
// so and try to read it again
@@ -171,9 +171,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
}
Assert.assertNotNull("Could not find expected entry in replication table", entry);
- Status actualStatus = Status.parseFrom(entry.getValue().get());
- Assert.assertTrue("Expected to find a replication entry that is closed with infinite length: " + ProtobufUtil.toString(actualStatus),
- actualStatus.getClosed() && actualStatus.hasClosedTime());
+ Assert.assertEquals("Expected to find a replication entry that is open with infinite length", expectedStatus, Status.parseFrom(entry.getValue().get()));
// Try a couple of times to watch for the work record to be created
boolean notFound = true;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 171d690..11ff405 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -19,6 +19,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
log4j.logger.org.apache.accumulo.core=DEBUG
+log4j.logger.org.apache.accumulo.core.client.impl.MasterClient=INFO
log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
log4j.logger.org.apache.accumulo.core.util.shell.Shell=FATAL