You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:02 UTC
[02/51] [abbrv] git commit: ACCUMULO-378 Test class consolidation
ACCUMULO-378 Test class consolidation
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d9b5ed2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d9b5ed2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d9b5ed2
Branch: refs/heads/master
Commit: 9d9b5ed24f3e425459108a993ab2cea121d1b612
Parents: 3b727cf
Author: Josh Elser <el...@apache.org>
Authored: Mon May 26 13:48:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 26 13:48:55 2014 -0400
----------------------------------------------------------------------
.../replication/SequentialWorkAssigner.java | 2 +-
.../replication/ReplicationProcessor.java | 4 +
.../replication/MultiTserverReplicationIT.java | 113 ++
.../replication/ReplicationDeadlockTest.java | 170 ---
.../ReplicationFilesClosedAfterUnusedTest.java | 172 ---
.../test/replication/ReplicationIT.java | 338 +++++-
.../ReplicationPortAdvertisementIT.java | 113 --
.../replication/ReplicationSequentialIT.java | 402 -------
.../replication/ReplicationSourceOnlyIT.java | 208 ----
.../replication/ReplicationTablesMacTest.java | 90 --
.../test/replication/ReplicationTest.java | 1135 +++++++++++++++++-
.../test/replication/ReplicationWithGCIT.java | 554 ---------
.../replication/ReplicationWithMakerTest.java | 337 ------
13 files changed, 1588 insertions(+), 2050 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f2d110a..af43d7d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -297,7 +297,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
}
} else {
- log.debug("Not queueing work for {} because {} doesn't need replication", file, ProtobufUtil.toString(status));
+ log.debug("Not queueing work for {} to {} because {} doesn't need replication", file, target, ProtobufUtil.toString(status));
if (key.equals(keyBeingReplicated)) {
log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName());
queuedWorkForPeer.remove(sourceTableId);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index f6fe91f..50c79d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.tserver.replication;
import java.io.IOException;
import java.util.Map;
+import java.util.NoSuchElementException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -96,6 +97,9 @@ public class ReplicationProcessor implements Processor {
} catch (InvalidProtocolBufferException e) {
log.error("Could not deserialize Status from Work section for {} and ", file, target);
throw new RuntimeException("Could not parse Status for work record", e);
+ } catch (NoSuchElementException e) {
+ log.error("Assigned work for {} to {} but could not find work record", file, target);
+ return;
}
log.debug("Current status for {} replicating to {}: {}", file, target, ProtobufUtil.toString(status));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
new file mode 100644
index 0000000..96e8b52
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+
+/**
+ *
+ */
+public class MultiTserverReplicationIT extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(MultiTserverReplicationIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(2);
+ }
+
+ @Test
+ public void tserverReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+ Set<String> tserverHost = new HashSet<>();
+ tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
+
+ Set<HostAndPort> replicationServices = new HashSet<>();
+
+ for (String tserver : tserverHost) {
+ try {
+ byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
+ HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
+ replicationServices.add(replAddress);
+ } catch (Exception e) {
+ log.error("Could not find port for {}", tserver, e);
+ Assert.fail("Did not find replication port advertisement for " + tserver);
+ }
+ }
+
+ // Each tserver should also have equial replicaiton services running internally
+ Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
+ }
+
+ @Test
+ public void masterReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+ // Should have one master instance
+ Assert.assertEquals(1, inst.getMasterLocations().size());
+
+ // Get the master thrift service addr
+ String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
+
+ // Get the master replication coordinator addr
+ String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
+
+ // They shouldn't be the same
+ Assert.assertNotEquals(masterAddr, replCoordAddr);
+
+ // Neither should be zero as the port
+ Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
+ Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
deleted file mode 100644
index 418d717..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Test;
-
-/**
- *
- */
-public class ReplicationDeadlockTest extends ConfigurableMacIT {
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Set<String> metadataWals(Connector conn) throws Exception {
- Scanner s = conn.createScanner(MetadataTable.NAME, new Authorizations());
- s.fetchColumnFamily(LogColumnFamily.NAME);
- Set<String> metadataWals = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- metadataWals.add(new Path(log).toString());
- }
- }
- return metadataWals;
- }
-
- @Test(timeout = 60 * 1000)
- public void noDeadlock() throws Exception {
- final Connector conn = getConnector();
-
- if (conn.tableOperations().exists(ReplicationTable.NAME)) {
- conn.tableOperations().delete(ReplicationTable.NAME);
- }
-
- ReplicationTable.create(conn);
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
- final Set<String> metadataWals = new HashSet<>();
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- metadataWals.addAll(metadataWals(conn));
- } catch (Exception e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- String table1 = "table1", table2 = "table2", table3 = "table3";
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Flush everything to try to make the replication records
- for (String table : Arrays.asList(table1, table2, table3)) {
- conn.tableOperations().flush(table, null, null, true);
- }
-
- keepRunning.set(false);
- t.join(5000);
-
- for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
- Scanner s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : s) {}
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
deleted file mode 100644
index eb89317..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class ReplicationFilesClosedAfterUnusedTest extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationFilesClosedAfterUnusedTest.class);
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
- cfg.setProperty(Property.REPLICATION_NAME, "master");
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Test(timeout = 60000)
- public void test() throws Exception {
- Connector conn = getConnector();
-
- String table = "table";
- conn.tableOperations().create(table);
- String tableId = conn.tableOperations().tableIdMap().get(table);
-
- Assert.assertNotNull(tableId);
-
- log.info("Writing to {}", tableId);
-
- conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- // just sleep
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
-
- // Write a mutation to make a log file
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
- Mutation m = new Mutation("one");
- m.put("", "", "");
- bw.addMutation(m);
- bw.close();
-
- // Write another to make sure the logger rolls itself?
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- m = new Mutation("three");
- m.put("", "", "");
- bw.addMutation(m);
- bw.close();
-
- Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
- s.setRange(TabletsSection.getRange(tableId));
- Set<String> wals = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String file : logEntry.logSet) {
- Path p = new Path(file);
- wals.add(p.toString());
- }
- }
-
- log.warn("Found wals {}", wals);
-
- // for (int j = 0; j < 5; j++) {
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- m = new Mutation("three");
- byte[] bytes = new byte[1024 * 1024];
- m.put("1".getBytes(), new byte[0], bytes);
- m.put("2".getBytes(), new byte[0], bytes);
- m.put("3".getBytes(), new byte[0], bytes);
- m.put("4".getBytes(), new byte[0], bytes);
- m.put("5".getBytes(), new byte[0], bytes);
- bw.addMutation(m);
- bw.close();
-
- conn.tableOperations().flush(table, null, null, true);
-
- while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
- UtilWaitThread.sleep(500);
- }
-
- for (int i = 0; i < 5; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(LogColumnFamily.NAME);
- s.setRange(TabletsSection.getRange(tableId));
- for (Entry<Key,Value> entry : s) {
- log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
- }
-
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Text buff = new Text();
- boolean allReferencedLogsClosed = true;
- int recordsFound = 0;
- for (Entry<Key,Value> e : s) {
- recordsFound++;
- allReferencedLogsClosed = true;
- StatusSection.getFile(e.getKey(), buff);
- String file = buff.toString();
- if (wals.contains(file)) {
- Status stat = Status.parseFrom(e.getValue().get());
- if (!stat.getClosed()) {
- log.info("{} wasn't closed", file);
- allReferencedLogsClosed = false;
- }
- }
- }
-
- if (recordsFound > 0 && allReferencedLogsClosed) {
- return;
- }
-
- Thread.sleep(1000);
- }
-
- Assert.fail("We had a file that was referenced but didn't get closed");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index f34b626..db21586 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -19,6 +19,12 @@ package org.apache.accumulo.test.replication;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -31,13 +37,16 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
@@ -48,7 +57,9 @@ import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +67,20 @@ import org.slf4j.LoggerFactory;
public class ReplicationIT extends ConfigurableMacIT {
private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
+ private ExecutorService executor;
+
+ @Before
+ public void createExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void stopExecutor() {
+ if (null != executor) {
+ executor.shutdownNow();
+ }
+ }
+
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
@@ -66,6 +91,7 @@ public class ReplicationIT extends ConfigurableMacIT {
cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -79,6 +105,314 @@ public class ReplicationIT extends ConfigurableMacIT {
peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+ peerCluster.start();
+
+ try {
+ final Connector connMaster = getConnector();
+ final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+
+ ReplicationTable.create(connMaster);
+
+ String peerUserName = "peer", peerPassword = "foo";
+
+ String peerClusterName = "peer";
+
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+ final String masterTable = "master", peerTable = "peer";
+
+ connMaster.tableOperations().create(masterTable);
+ String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+ Assert.assertNotNull(masterTableId);
+
+ connPeer.tableOperations().create(peerTable);
+ String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+ Assert.assertNotNull(peerTableId);
+
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+ for (int rows = 0; rows < 5000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+// log.debug("");
+// for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+// if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+// log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+// } else {
+// log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+// }
+// }
+
+ final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("TabletServer restarted");
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
+ log.info("TabletServer is online");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
+ log.info("Drain completed");
+ return true;
+ }
+
+ });
+
+ try {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ Assert.fail("Drain did not finish within 30 seconds");
+ }
+
+ log.info("drain completed");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+ Entry<Key,Value> masterEntry = null, peerEntry = null;
+ while (masterIter.hasNext() && peerIter.hasNext()) {
+ masterEntry = masterIter.next();
+ peerEntry = peerIter.next();
+ Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+ masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+ Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+ }
+
+ log.info("Last master entry: " + masterEntry);
+ log.info("Last peer entry: " + peerEntry);
+
+ Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+ Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+ } finally {
+ peerCluster.stop();
+ }
+ }
+
+ @Test(timeout = 60 * 5000)
+ public void dataReplicatedToCorrectTable() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
+
+ peer1Cluster.start();
+
+ try {
+ Connector connMaster = getConnector();
+ Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+
+ String peerClusterName = "peer";
+ String peerUserName = "peer", peerPassword = "foo";
+
+ // Create local user
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+ String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+ // Create tables
+ connMaster.tableOperations().create(masterTable1);
+ String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+ Assert.assertNotNull(masterTableId1);
+
+ connMaster.tableOperations().create(masterTable2);
+ String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+ Assert.assertNotNull(masterTableId2);
+
+ connPeer.tableOperations().create(peerTable1);
+ String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+ Assert.assertNotNull(peerTableId1);
+
+ connPeer.tableOperations().create(peerTable2);
+ String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+ Assert.assertNotNull(peerTableId2);
+
+ // Grant write permission
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
+
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+ long masterTable1Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable1 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable1Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table2
+ bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+ long masterTable2Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable2 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable2Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+ masterTable2);
+
+ while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
+ Thread.sleep(500);
+ }
+
+ // Restart the tserver to force a close on the WAL
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("Restarted the tserver");
+
+ // Read the data -- the tserver is back up and running
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+
+ // Wait for both tables to be replicated
+ log.info("Waiting for {} for {}", filesFor1, masterTable1);
+ connMaster.replicationOperations().drain(masterTable1, filesFor1);
+
+ log.info("Waiting for {} for {}", filesFor2, masterTable2);
+ connMaster.replicationOperations().drain(masterTable2, filesFor2);
+
+ long countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable1));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable1);
+ Assert.assertEquals(masterTable1Records, countTable);
+
+ countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable2));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable2);
+ Assert.assertEquals(masterTable2Records, countTable);
+
+ } finally {
+ peer1Cluster.stop();
+ }
+ }
+
+ @Test(timeout = 60 * 5000)
+ public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
@@ -169,7 +503,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
@Test(timeout = 60 * 5000)
- public void dataReplicatedToCorrectTable() throws Exception {
+ public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
deleted file mode 100644
index 0afbc05..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
-
-/**
- *
- */
-public class ReplicationPortAdvertisementIT extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationPortAdvertisementIT.class);
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(2);
- }
-
- @Test
- public void tserverReplicationServicePortsAreAdvertised() throws Exception {
- // Wait for the cluster to be up
- Connector conn = getConnector();
- Instance inst = conn.getInstance();
-
- // Wait for a tserver to come up to fulfill this request
- conn.tableOperations().create("foo");
- Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
- Assert.assertEquals(0, Iterables.size(s));
-
- ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
- Set<String> tserverHost = new HashSet<>();
- tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
-
- Set<HostAndPort> replicationServices = new HashSet<>();
-
- for (String tserver : tserverHost) {
- try {
- byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
- HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
- replicationServices.add(replAddress);
- } catch (Exception e) {
- log.error("Could not find port for {}", tserver, e);
- Assert.fail("Did not find replication port advertisement for " + tserver);
- }
- }
-
- // Each tserver should also have equial replicaiton services running internally
- Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
- }
-
- @Test
- public void masterReplicationServicePortsAreAdvertised() throws Exception {
- // Wait for the cluster to be up
- Connector conn = getConnector();
- Instance inst = conn.getInstance();
-
- // Wait for a tserver to come up to fulfill this request
- conn.tableOperations().create("foo");
- Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
- Assert.assertEquals(0, Iterables.size(s));
-
- ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
-
- // Should have one master instance
- Assert.assertEquals(1, inst.getMasterLocations().size());
-
- // Get the master thrift service addr
- String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
-
- // Get the master replication coordinator addr
- String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
-
- // They shouldn't be the same
- Assert.assertNotEquals(masterAddr, replCoordAddr);
-
- // Neither should be zero as the port
- Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
- Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
deleted file mode 100644
index c7c36e8..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.master.replication.SequentialWorkAssigner;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReplicationSequentialIT extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationSequentialIT.class);
-
- private ExecutorService executor;
-
- @Before
- public void createExecutor() {
- executor = Executors.newSingleThreadExecutor();
- }
-
- @After
- public void stopExecutor() {
- if (null != executor) {
- executor.shutdownNow();
- }
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
- cfg.setProperty(Property.REPLICATION_NAME, "master");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Test(timeout = 60 * 5000)
- public void dataWasReplicatedToThePeer() throws Exception {
- MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
- ROOT_PASSWORD);
- peerCfg.setNumTservers(1);
- peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- MiniAccumuloClusterImpl peerCluster = peerCfg.build();
-
- peerCluster.start();
-
- try {
- final Connector connMaster = getConnector();
- final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-
- ReplicationTable.create(connMaster);
-
- String peerUserName = "peer", peerPassword = "foo";
-
- String peerClusterName = "peer";
-
- connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
- // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
- connMaster.instanceOperations().setProperty(
- Property.REPLICATION_PEERS.getKey() + peerClusterName,
- ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
- AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-
- final String masterTable = "master", peerTable = "peer";
-
- connMaster.tableOperations().create(masterTable);
- String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
- Assert.assertNotNull(masterTableId);
-
- connPeer.tableOperations().create(peerTable);
- String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
- Assert.assertNotNull(peerTableId);
-
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-
- // Replicate this table to the peerClusterName in a table with the peerTableId table id
- connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
-
- // Write some data to table1
- BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
- for (int rows = 0; rows < 5000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- log.info("Wrote all data to master cluster");
-
- log.debug("");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
- cluster.exec(TabletServer.class);
-
- log.info("TabletServer restarted");
- for (@SuppressWarnings("unused")
- Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
- log.info("TabletServer is online");
-
- log.info("");
- log.info("Fetching metadata records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- log.info("");
- log.info("Fetching replication records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- }
-
- Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
- log.info("Drain completed");
- return true;
- }
-
- });
-
- try {
- future.get(30, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- future.cancel(true);
- Assert.fail("Drain did not finish within 30 seconds");
- }
-
- log.info("drain completed");
-
- log.info("");
- log.info("Fetching metadata records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- log.info("");
- log.info("Fetching replication records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- }
-
- Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
- Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
- Entry<Key,Value> masterEntry = null, peerEntry = null;
- while (masterIter.hasNext() && peerIter.hasNext()) {
- masterEntry = masterIter.next();
- peerEntry = peerIter.next();
- Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
- masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
- Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
- }
-
- log.info("Last master entry: " + masterEntry);
- log.info("Last peer entry: " + peerEntry);
-
- Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
- Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
- } finally {
- peerCluster.stop();
- }
- }
-
- @Test(timeout = 60 * 5000)
- public void dataReplicatedToCorrectTable() throws Exception {
- MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
- ROOT_PASSWORD);
- peerCfg.setNumTservers(1);
- peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
-
- peer1Cluster.start();
-
- try {
- Connector connMaster = getConnector();
- Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
-
- String peerClusterName = "peer";
- String peerUserName = "peer", peerPassword = "foo";
-
- // Create local user
- connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
- // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
- connMaster.instanceOperations().setProperty(
- Property.REPLICATION_PEERS.getKey() + peerClusterName,
- ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
- AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
-
- String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
-
- // Create tables
- connMaster.tableOperations().create(masterTable1);
- String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
- Assert.assertNotNull(masterTableId1);
-
- connMaster.tableOperations().create(masterTable2);
- String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
- Assert.assertNotNull(masterTableId2);
-
- connPeer.tableOperations().create(peerTable1);
- String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
- Assert.assertNotNull(peerTableId1);
-
- connPeer.tableOperations().create(peerTable2);
- String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
- Assert.assertNotNull(peerTableId2);
-
- // Grant write permission
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
-
- // Replicate this table to the peerClusterName in a table with the peerTableId table id
- connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
-
- connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
-
- // Write some data to table1
- BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
- long masterTable1Records = 0l;
- for (int rows = 0; rows < 2500; rows++) {
- Mutation m = new Mutation(masterTable1 + rows);
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- masterTable1Records++;
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table2
- bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
- long masterTable2Records = 0l;
- for (int rows = 0; rows < 2500; rows++) {
- Mutation m = new Mutation(masterTable2 + rows);
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- masterTable2Records++;
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- log.info("Wrote all data to master cluster");
-
- Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
- masterTable2);
-
- while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
- Thread.sleep(500);
- }
-
- // Restart the tserver to force a close on the WAL
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
- cluster.exec(TabletServer.class);
-
- log.info("Restarted the tserver");
-
- // Read the data -- the tserver is back up and running
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
-
- // Wait for both tables to be replicated
- log.info("Waiting for {} for {}", filesFor1, masterTable1);
- connMaster.replicationOperations().drain(masterTable1, filesFor1);
-
- log.info("Waiting for {} for {}", filesFor2, masterTable2);
- connMaster.replicationOperations().drain(masterTable2, filesFor2);
-
- long countTable = 0l;
- for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
- countTable++;
- Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
- .startsWith(masterTable1));
- }
-
- log.info("Found {} records in {}", countTable, peerTable1);
- Assert.assertEquals(masterTable1Records, countTable);
-
- countTable = 0l;
- for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
- countTable++;
- Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
- .startsWith(masterTable2));
- }
-
- log.info("Found {} records in {}", countTable, peerTable2);
- Assert.assertEquals(masterTable2Records, countTable);
-
- } finally {
- peer1Cluster.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
deleted file mode 100644
index 62c09f5..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any
- * remote instance, merely tracks what is stored locally.
- */
-public class ReplicationSourceOnlyIT extends ConfigurableMacIT {
- @Override
- public int defaultTimeoutSeconds() {
- return 300;
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
- Multimap<String,String> logs = HashMultimap.create();
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.setRange(new Range());
- for (Entry<Key,Value> entry : scanner) {
- if (Thread.interrupted()) {
- return logs;
- }
-
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- for (String log : logEntry.logSet) {
- // Need to normalize the log file from LogEntry
- logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
- }
- }
- return logs;
- }
-
- @Test
- public void replicationEntriesPrecludeWalDeletion() throws Exception {
- final Connector conn = getConnector();
- String table1 = "table1", table2 = "table2", table3 = "table3";
- final Multimap<String,String> logs = HashMultimap.create();
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- logs.putAll(getLogs(conn));
- } catch (TableNotFoundException e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Force a write to metadata for the data written
- for (String table : Arrays.asList(table1, table2, table3)) {
- conn.tableOperations().flush(table, null, null, true);
- }
-
- keepRunning.set(false);
- t.join(5000);
-
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Set<String> replFiles = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- replFiles.add(entry.getKey().getRow().toString());
- }
-
- // We might have a WAL that was use solely for the replication table
- // We want to remove that from our list as it should not appear in the replication table
- String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
- Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
- while (observedLogs.hasNext()) {
- Entry<String,String> observedLog = observedLogs.next();
- if (replicationTableId.equals(observedLog.getValue())) {
- observedLogs.remove();
- }
- }
-
- // We should have *some* reference to each log that was seen in the metadata table
- // They might not yet all be closed though (might be newfile)
- Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
-
- for (String replFile : replFiles) {
- Path p = new Path(replFile);
- FileSystem fs = p.getFileSystem(new Configuration());
- Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
deleted file mode 100644
index da874fa..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- *
- */
-public class ReplicationTablesMacTest extends ConfigurableMacIT {
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- }
-
- @Test
- public void combinerWorksOnMetadata() throws Exception {
- Connector conn = getConnector();
-
- conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
-
- ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
-
- Status stat1 = StatusUtil.fileCreated(100);
- Status stat2 = StatusUtil.fileClosed();
-
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
- m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
- bw.close();
-
- Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- System.out.println("Printing metadata table");
-
- Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
- Assert.assertEquals(stat1, actual);
-
- bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
- m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
- bw.close();
-
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
-
- actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
- Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
-
- Assert.assertEquals(expected, actual);
- }
-
-}