You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:36 UTC
[36/51] [abbrv] git commit: ACCUMULO-378 More rb changes.
ACCUMULO-378 More rb changes.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ad4ea6b2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ad4ea6b2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ad4ea6b2
Branch: refs/heads/master
Commit: ad4ea6b248f49d5c79390d2e99f9804c0606ded3
Parents: 856f235
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 5 01:31:40 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 5 01:31:40 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/Constants.java | 5 ----
.../client/admin/ReplicationOperations.java | 21 ++++++++++++---
.../core/client/impl/ReplicationClient.java | 1 -
.../client/impl/ReplicationOperationsImpl.java | 8 ++++++
.../core/replication/ReplicationConstants.java | 28 ++++++++++++++++++++
.../apache/accumulo/server/init/Initialize.java | 5 ++--
.../DistributedWorkQueueWorkAssigner.java | 4 +--
.../MasterReplicationCoordinator.java | 4 +--
.../replication/SequentialWorkAssigner.java | 4 +--
.../replication/UnorderedWorkAssigner.java | 4 +--
.../replication/SequentialWorkAssignerTest.java | 6 ++---
.../replication/UnorderedWorkAssignerTest.java | 5 ++--
.../monitor/servlets/ReplicationServlet.java | 4 +--
.../apache/accumulo/tserver/TabletServer.java | 3 ++-
.../tserver/replication/ReplicationWorker.java | 6 ++---
.../replication/MultiTserverReplicationIT.java | 3 ++-
16 files changed, 80 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 795d6fe..3edba81 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -114,9 +114,4 @@ public class Constants {
public static final String[] PATH_PROPERTY_ENV_VARS = new String[] {"ACCUMULO_HOME", "ACCUMULO_CONF_DIR"};
public static final String HDFS_TABLES_DIR = "/tables";
-
- // Constants for replication information in zookeeper
- public static final String ZREPLICATION = "/replication";
- public static final String ZREPLICATION_WORK_QUEUE = ZREPLICATION + "/workqueue";
- public static final String ZREPLICATION_TSERVERS = ZREPLICATION + "/tservers";
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 5873f73..9267f67 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -36,6 +36,13 @@ public interface ReplicationOperations {
* @param name Name of the cluster, used for configuring replication on tables
* @param system Type of system to be replicated to
*/
+ public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
+
+ /**
+ * Defines a cluster with the given name using the given {@link ReplicaSystem}.
+ * @param name Name of the cluster, used for configuring replication on tables
+ * @param system Type of system to be replicated to
+ */
public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
/**
@@ -54,7 +61,8 @@ public interface ReplicationOperations {
public void removePeer(String name) throws AccumuloException, AccumuloSecurityException, PeerNotFoundException;
/**
- * Waits for a table to be fully replicated.
+ * Waits for a table to be fully replicated, given the state of files pending replication for the provided table
+ * at the point in time which this method is invoked.
* @param tableName The table to wait for
* @throws AccumuloException
* @throws AccumuloSecurityException
@@ -62,7 +70,9 @@ public interface ReplicationOperations {
public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
/**
- * Waits for a table to be fully replicated as determined by the provided tables.
+ * Given the provided set of files that are pending replication for a table, wait for those
+ * files to be fully replicated to all configured peers. This allows for the accurate calculation
+ * when a table, at a given point in time, has been fully replicated.
* @param tableName The table to wait for
* @throws AccumuloException
* @throws AccumuloSecurityException
@@ -70,7 +80,12 @@ public interface ReplicationOperations {
public void drain(String tableName, Set<String> files) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
/**
- * Gets all of the referenced files for a table.
+ * Gets all of the referenced files for a table from the metadata table. The result of this method
+ * is intended to be directly supplied to {@link #drain(String, Set)}. This helps determine when all
+ * data from a given point in time has been fully replicated.
+ * <p>
+ * This also allows callers to get the {@link Set} of files for a table at some time, and later provide that
+ * {@link Set} to {@link #drain(String,Set)} to wait for all of those files to be replicated.
* @param tableName
* @throws TableNotFoundException
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 13c027a..8f15839 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 51a5367..8c50ecc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -77,6 +77,14 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
}
@Override
+ public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException, AccumuloSecurityException, PeerExistsException {
+ checkNotNull(name);
+ checkNotNull(system);
+
+ addPeer(name, system.getName());
+ }
+
+ @Override
public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException, PeerExistsException {
checkNotNull(name);
checkNotNull(system);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
new file mode 100644
index 0000000..517920c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+/**
+ *
+ */
+public class ReplicationConstants {
+ // Constants for replication information in zookeeper
+ public static final String ZOO_BASE = "/replication";
+ public static final String ZOO_WORK_QUEUE = ZOO_BASE + "/workqueue";
+ public static final String ZOO_TSERVERS = ZOO_BASE + "/tservers";
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 1d9fb86..78fa4e7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.security.SecurityUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.volume.VolumeConfiguration;
@@ -480,8 +481,8 @@ public class Initialize {
zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
- zoo.putPersistentData(zkInstanceRoot + Constants.ZREPLICATION, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
- zoo.putPersistentData(zkInstanceRoot + Constants.ZREPLICATION_TSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
}
private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 4815305..234b153 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -20,7 +20,6 @@ import java.util.Collection;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -30,6 +29,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -114,7 +114,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
* @param conf
*/
protected void initializeWorkQueue(AccumuloConfiguration conf) {
- workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + Constants.ZREPLICATION_WORK_QUEUE, conf);
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(conn.getInstance()) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index 974aaa9..11c04be 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -21,9 +21,9 @@ import java.util.Iterator;
import java.util.Random;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorErrorCode;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorException;
@@ -83,7 +83,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
TServerInstance tserver = getRandomTServer(tservers, rand.nextInt(tservers.size()));
String replServiceAddr;
try {
- replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
+ replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS + "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
} catch (KeeperException | InterruptedException e) {
log.error("Could not fetch repliation service port for tserver", e);
throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index e56763e..4b2936c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -24,9 +24,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
@@ -137,7 +137,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
// tableID -> workKey
Entry<String,String> entry = iter.next();
// Null equates to the work for this target was finished
- if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE + "/" + entry.getValue())) {
+ if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + entry.getValue())) {
log.debug("Removing {} from work assignment state", entry.getValue());
iter.remove();
elementsRemoved++;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
index b6706ef..9042e2d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java
@@ -20,9 +20,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -133,7 +133,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner {
while (work.hasNext()) {
String filename = work.next();
// Null equates to the work was finished
- if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION_WORK_QUEUE + "/" + filename)) {
+ if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + filename)) {
work.remove();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index e7ff4ca..3d96ea1 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -35,6 +34,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -329,11 +329,11 @@ public class SequentialWorkAssignerTest {
// file1 replicated
expect(
- zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+ zooCache.get(ZooUtil.getRoot("instance") + ReplicationConstants.ZOO_WORK_QUEUE + "/"
+ DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", "1")))).andReturn(null);
// file2 still needs to replicate
expect(
- zooCache.get(ZooUtil.getRoot("instance") + Constants.ZREPLICATION_WORK_QUEUE + "/"
+ zooCache.get(ZooUtil.getRoot("instance") + ReplicationConstants.ZOO_WORK_QUEUE + "/"
+ DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")))).andReturn(new byte[0]);
replay(workQueue, zooCache, conn, inst);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index 0c9384e..2199808 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -238,8 +239,8 @@ public class UnorderedWorkAssignerTest {
expect(conn.getInstance()).andReturn(inst);
expect(inst.getInstanceID()).andReturn("id");
- expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal1")).andReturn(null);
- expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION_WORK_QUEUE + "/wal2")).andReturn(null);
+ expect(cache.get(Constants.ZROOT + "/id" + ReplicationConstants.ZOO_WORK_QUEUE + "/wal1")).andReturn(null);
+ expect(cache.get(Constants.ZROOT + "/id" + ReplicationConstants.ZOO_WORK_QUEUE + "/wal2")).andReturn(null);
replay(cache, inst, conn);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index ab83b4a..9936946 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -27,7 +27,6 @@ import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -42,6 +41,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.StatusUtil;
@@ -207,7 +207,7 @@ public class ReplicationServlet extends BasicServlet {
// Read the files from the workqueue in zk
String zkRoot = ZooUtil.getRoot(inst);
- final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
+ final String workQueuePath = zkRoot + ReplicationConstants.ZOO_WORK_QUEUE;
DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dd3c16e..e1a62de 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -111,6 +111,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SecurityUtil;
@@ -2893,7 +2894,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
try {
// The replication service is unique to the thrift service for a tserver, not just a host.
// Advertise the host and port for replication service given the host and port for the tserver.
- ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(),
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(),
sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
log.error("Could not advertise replication service port", e);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index a223511..20da0d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -18,11 +18,11 @@ package org.apache.accumulo.tserver.replication;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
@@ -63,10 +63,10 @@ public class ReplicationWorker implements Runnable {
DistributedWorkQueue workQueue;
if (defaultDelay != delay && defaultPeriod != period) {
log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay, period);
- workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf, delay, period);
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf, delay, period);
} else {
log.debug("Configuring DistributedWorkQueue with default delay and period");
- workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf);
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()), executor);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ad4ea6b2/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
index 96e8b52..0f4cf5b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooReader;
@@ -68,7 +69,7 @@ public class MultiTserverReplicationIT extends ConfigurableMacIT {
for (String tserver : tserverHost) {
try {
- byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
+ byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS + "/" + tserver, null);
HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
replicationServices.add(replAddress);
} catch (Exception e) {