You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/16 22:15:54 UTC
[4/4] git commit: ACCUMULO-3130 Ensure MultiInstanceReplicationIT is
running with SSL and credential providers
ACCUMULO-3130 Ensure MultiInstanceReplicationIT is running with SSL and credential providers
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e205e0c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e205e0c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e205e0c
Branch: refs/heads/master
Commit: 6e205e0c794c88b23c3e2a46baa3fdd0167ca5a3
Parents: e3b8ec5
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 16 01:29:01 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 16 16:15:34 2014 -0400
----------------------------------------------------------------------
.../accumulo/test/functional/AbstractMacIT.java | 2 +-
.../test/functional/ConfigurableMacIT.java | 2 +-
.../accumulo/test/functional/ExamplesIT.java | 8 +-
.../accumulo/test/functional/SimpleMacIT.java | 2 +-
.../replication/MultiInstanceReplicationIT.java | 141 ++++++++++++-------
5 files changed, 94 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java
index 22e46ff..ce6164b 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java
@@ -81,7 +81,7 @@ public abstract class AbstractMacIT {
return names;
}
- protected static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) {
+ protected static void configureForEnvironment(MiniAccumuloConfigImpl cfg, File folder) {
if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useSslForIT"))) {
configureForSsl(cfg, folder);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
index 59b0977..67869e9 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
@@ -54,7 +54,7 @@ public class ConfigurableMacIT extends AbstractMacIT {
Configuration coreSite = new Configuration(false);
configure(cfg, coreSite);
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString());
- configureForEnvironment(cfg, getClass(), createSharedTestDir(this.getClass().getName() + "-ssl"));
+ configureForEnvironment(cfg, createSharedTestDir(this.getClass().getName() + "-ssl"));
cluster = new MiniAccumuloClusterImpl(cfg);
if (coreSite.size() > 0) {
File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
index 7864ec8..b5d96d6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
@@ -114,7 +114,7 @@ public class ExamplesIT extends AbstractMacIT {
cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString());
- configureForEnvironment(cfg, ExamplesIT.class, createSharedTestDir(ExamplesIT.class.getName() + "-ssl"));
+ configureForEnvironment(cfg, createSharedTestDir(ExamplesIT.class.getName() + "-ssl"));
cluster = new MiniAccumuloClusterImpl(cfg);
cluster.start();
@@ -228,18 +228,18 @@ public class ExamplesIT extends AbstractMacIT {
bw.addMutation(m);
bw.flush();
-
+
Iterator<Entry<Key, Value>> iter = c.createScanner(table, Authorizations.EMPTY).iterator();
assertTrue("Iterator had no results", iter.hasNext());
Entry<Key, Value> e = iter.next();
assertEquals("Results ", "1,3,4,2", e.getValue().toString());
assertFalse("Iterator had additional results", iter.hasNext());
-
+
m = new Mutation("foo");
m.put("a", "b", "0,20,20,2");
bw.addMutation(m);
bw.close();
-
+
iter = c.createScanner(table, Authorizations.EMPTY).iterator();
assertTrue("Iterator had no results", iter.hasNext());
e = iter.next();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java
index b166ffd..3e11653 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java
@@ -55,7 +55,7 @@ public class SimpleMacIT extends AbstractMacIT {
MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(folder, ROOT_PASSWORD);
cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath());
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString());
- configureForEnvironment(cfg, SimpleMacIT.class, createSharedTestDir(SimpleMacIT.class.getName() + "-ssl"));
+ configureForEnvironment(cfg, createSharedTestDir(SimpleMacIT.class.getName() + "-ssl"));
cluster = new MiniAccumuloClusterImpl(cfg);
cluster.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 3c6da2e..fcab23b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -16,7 +16,9 @@
*/
package org.apache.accumulo.test.replication;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -40,13 +42,14 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
@@ -96,20 +99,60 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
cfg.setProperty(Property.REPLICATION_NAME, "master");
cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+ cfg.setMemory(ServerType.MASTER, 1, MemoryUnit.GIGABYTE);
+ cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
+ /**
+ * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication
+ */
+ private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) {
+ // Set the same SSL information from the primary when present
+ Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig();
+ if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
+ Map<String,String> peerSiteConfig = new HashMap<String,String>();
+ peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
+ String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
+ Assert.assertNotNull("Keystore Path was null", keystorePath);
+ peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath);
+ String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
+ Assert.assertNotNull("Truststore Path was null", truststorePath);
+ peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath);
+
+ // Passwords might be stored in CredentialProvider
+ String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
+ if (null != keystorePassword) {
+ peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword);
+ }
+ String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
+ if (null != truststorePassword) {
+ peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
+ }
+
+ System.out.println("Setting site configuration for peer " + peerSiteConfig);
+ peerCfg.setSiteConfig(peerSiteConfig);
+ }
+
+ // Use the CredentialProvider if the primary also uses one
+ String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
+ if (null != credProvider) {
+ Map<String,String> peerSiteConfig = peerCfg.getSiteConfig();
+ peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider);
+ peerCfg.setSiteConfig(peerSiteConfig);
+ }
+ }
+
@Test(timeout = 60 * 5000)
public void dataWasReplicatedToThePeer() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
@@ -117,40 +160,40 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
try {
final Connector connMaster = getConnector();
final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-
+
ReplicationTable.create(connMaster);
String peerUserName = "peer", peerPassword = "foo";
-
+
String peerClusterName = "peer";
connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
+
connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
+
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
connMaster.instanceOperations().setProperty(
Property.REPLICATION_PEERS.getKey() + peerClusterName,
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-
+
final String masterTable = "master", peerTable = "peer";
-
+
connMaster.tableOperations().create(masterTable);
String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
Assert.assertNotNull(masterTableId);
-
+
connPeer.tableOperations().create(peerTable);
String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
Assert.assertNotNull(peerTableId);
connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-
+
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
-
+
// Write some data to table1
BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
for (int rows = 0; rows < 5000; rows++) {
@@ -161,32 +204,23 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
}
bw.addMutation(m);
}
-
+
bw.close();
-
+
log.info("Wrote all data to master cluster");
-
-// log.debug("");
-// for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-// if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-// log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-// } else {
-// log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-// }
-// }
-
+
final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-
+
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
}
cluster.exec(TabletServer.class);
-
+
log.info("TabletServer restarted");
for (@SuppressWarnings("unused")
Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
log.info("TabletServer is online");
-
+
log.info("");
log.info("Fetching metadata records:");
for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
@@ -196,33 +230,33 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
}
}
-
+
log.info("");
log.info("Fetching replication records:");
for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
}
-
+
Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-
+
@Override
public Boolean call() throws Exception {
connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
log.info("Drain completed");
return true;
}
-
+
});
-
+
try {
- future.get(30, TimeUnit.SECONDS);
+ future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
- Assert.fail("Drain did not finish within 30 seconds");
+ Assert.fail("Drain did not finish within 60 seconds");
}
-
+
log.info("drain completed");
-
+
log.info("");
log.info("Fetching metadata records:");
for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
@@ -232,13 +266,13 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
}
}
-
+
log.info("");
log.info("Fetching replication records:");
for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
}
-
+
Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
Entry<Key,Value> masterEntry = null, peerEntry = null;
@@ -249,10 +283,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
}
-
+
log.info("Last master entry: " + masterEntry);
log.info("Last peer entry: " + peerEntry);
-
+
Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
} finally {
@@ -266,11 +300,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
peer1Cluster.start();
@@ -413,10 +446,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
@@ -512,10 +545,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
peer1Cluster.start();
@@ -636,7 +669,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
.startsWith(masterTable1));
}
-
+
log.info("Found {} records in {}", countTable, peerTable1);
if (0l == countTable) {
@@ -657,7 +690,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
.startsWith(masterTable2));
}
-
+
log.info("Found {} records in {}", countTable, peerTable2);
if (0l == countTable) {