You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/06 19:43:59 UTC
[1/2] git commit: ACCUMULO-378 Test stabilization
Repository: accumulo
Updated Branches:
refs/heads/ACCUMULO-378 5365b5501 -> 3a4cb95d8
ACCUMULO-378 Test stabilization
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8f531413
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8f531413
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8f531413
Branch: refs/heads/ACCUMULO-378
Commit: 8f53141334822045495e37a62b9a7f4685bbd4dc
Parents: 5365b55
Author: Josh Elser <el...@apache.org>
Authored: Fri Jun 6 13:42:14 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jun 6 13:42:14 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/test/replication/ReplicationTest.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8f531413/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index 422130c..a9b8b74 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -891,6 +891,11 @@ public class ReplicationTest extends ConfigurableMacIT {
conn.tableOperations().compact(ReplicationTable.NAME, null, null, true, true);
+ // Master is creating entries in the replication table from the metadata table every second.
+ // Compaction should trigger the record to be written to metadata. Wait a bit to ensure
+ // that the master has time to work.
+ Thread.sleep(5000);
+
s = ReplicationTable.getScanner(conn);
StatusSection.limit(s);
Assert.assertEquals(2, Iterables.size(s));
[2/2] git commit: ACCUMULO-378 More reviewboard changes.
Posted by el...@apache.org.
ACCUMULO-378 More reviewboard changes.
Make Combiner members internal again, removal of ReplicationTable
in core (put table name in ReplicationConstants),
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a4cb95d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a4cb95d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a4cb95d
Branch: refs/heads/ACCUMULO-378
Commit: 3a4cb95d82bfd52d8354f049d27368091421ab9d
Parents: 8f53141
Author: Josh Elser <el...@apache.org>
Authored: Fri Jun 6 13:42:32 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jun 6 13:42:32 2014 -0400
----------------------------------------------------------------------
.../client/impl/ReplicationOperationsImpl.java | 6 +-
.../replication/ReplicaSystemFactory.java | 4 +-
.../client/replication/ReplicationTable.java | 61 --------------------
.../accumulo/core/iterators/Combiner.java | 4 +-
.../replication/PrintReplicationRecords.java | 3 +-
.../core/replication/ReplicaSystemHelper.java | 4 +-
.../core/replication/ReplicationConstants.java | 2 +
.../ReplicationOperationsImplTest.java | 25 ++++----
.../server/replication/ReplicationTable.java | 38 +++++++++++-
.../server/replication/StatusCombinerTest.java | 6 +-
.../monitor/servlets/ReplicationServlet.java | 7 +--
11 files changed, 69 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index b16f38f..f820aa4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.replication.PeerExistsException;
import org.apache.accumulo.core.client.replication.PeerNotFoundException;
import org.apache.accumulo.core.client.replication.ReplicaSystem;
-import org.apache.accumulo.core.client.replication.ReplicationTable;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -45,6 +44,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -152,7 +152,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
log.info("reading from replication table");
boolean allReplicationRefsReplicated = false;
while (!allReplicationRefsReplicated) {
- BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+ BatchScanner bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 4);
bs.setRanges(Collections.singleton(new Range()));
try {
allReplicationRefsReplicated = allReferencesReplicated(bs, tableId, wals);
@@ -216,7 +216,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
TableOperations tops = conn.tableOperations();
- while (!tops.exists(ReplicationTable.NAME)) {
+ while (!tops.exists(ReplicationConstants.TABLE_NAME)) {
UtilWaitThread.sleep(200);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
index 164512a..e721278 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
@@ -47,9 +47,9 @@ public class ReplicaSystemFactory {
try {
Class<?> clz = Class.forName(name);
- Object o = clz.newInstance();
- if (ReplicaSystem.class.isAssignableFrom(o.getClass())) {
+ if (ReplicaSystem.class.isAssignableFrom(clz)) {
+ Object o = clz.newInstance();
ReplicaSystem rs = (ReplicaSystem) o;
rs.configure(configuration);
return rs;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
deleted file mode 100644
index 0b2b9a8..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.replication;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.security.Authorizations;
-
-/**
- *
- */
-public class ReplicationTable {
- public static final String NAME = "replication";
-
- public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
- return conn.createScanner(NAME, auths);
- }
-
- public static Scanner getScanner(Connector conn) throws TableNotFoundException {
- return getScanner(conn, new Authorizations());
- }
-
- public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
- return getBatchWriter(conn, new BatchWriterConfig());
- }
-
- public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
- return conn.createBatchWriter(NAME, config);
- }
-
- public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
- return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
- }
-
- public static boolean exists(Connector conn) {
- return exists(conn.tableOperations());
- }
-
- public static boolean exists(TableOperations tops) {
- return tops.exists(NAME);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java b/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
index 4feb3ea..ceb4411 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
@@ -53,8 +53,8 @@ import org.apache.log4j.Logger;
*/
public abstract class Combiner extends WrappingIterator implements OptionDescriber {
static final Logger log = Logger.getLogger(Combiner.class);
- public static final String COLUMNS_OPTION = "columns";
- public static final String ALL_OPTION = "all";
+ protected static final String COLUMNS_OPTION = "columns";
+ protected static final String ALL_OPTION = "all";
/**
* A Java Iterator that iterates over the Values for a given Key from a source SortedKeyValueIterator.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index 5104d39..2aef652 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -24,7 +24,6 @@ import java.util.Map.Entry;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.replication.ReplicationTable;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
@@ -81,7 +80,7 @@ public class PrintReplicationRecords implements Runnable {
out.println("--------------------------------------------------------------------");
try {
- s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
+ s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
} catch (TableNotFoundException e) {
log.error("Replication table does not exist");
return;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
index 660862c..a022362 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
@@ -19,10 +19,10 @@ package org.apache.accumulo.core.replication;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.replication.ReplicationTable;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
@@ -58,7 +58,7 @@ public class ReplicaSystemHelper {
*/
public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
try {
log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString (status));
Mutation m = new Mutation(filePath.toString());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
index 517920c..9815634 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.core.replication;
*
*/
public class ReplicationConstants {
+ public static final String TABLE_NAME = "replication";
+
// Constants for replication information in zookeeper
public static final String ZOO_BASE = "/replication";
public static final String ZOO_WORK_QUEUE = ZOO_BASE + "/workqueue";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
index b4a4c65..7cc839f 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
@@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.replication.ReplicationTable;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
@@ -68,14 +67,14 @@ public class ReplicationOperationsImplTest {
@Test
public void waitsUntilEntriesAreReplicated() throws Exception {
Connector conn = inst.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(ReplicationTable.NAME);
+ conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
conn.tableOperations().create("foo");
Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
- BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
Mutation m = new Mutation(file1);
StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
@@ -137,7 +136,7 @@ public class ReplicationOperationsImplTest {
Assert.assertFalse(done.get());
// Remove the replication entries too
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
m = new Mutation(file1);
m.putDelete(StatusSection.NAME, tableId);
bw.addMutation(m);
@@ -164,7 +163,7 @@ public class ReplicationOperationsImplTest {
@Test
public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
Connector conn = inst.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(ReplicationTable.NAME);
+ conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
conn.tableOperations().create("foo");
conn.tableOperations().create("bar");
@@ -174,7 +173,7 @@ public class ReplicationOperationsImplTest {
String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
- BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
Mutation m = new Mutation(file1);
StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -228,7 +227,7 @@ public class ReplicationOperationsImplTest {
Assert.assertFalse(done.get());
// Remove the replication entries too
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
m = new Mutation(file1);
m.putDelete(StatusSection.NAME, tableId1);
bw.addMutation(m);
@@ -248,7 +247,7 @@ public class ReplicationOperationsImplTest {
@Test
public void inprogressReplicationRecordsBlockExecution() throws Exception {
Connector conn = inst.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(ReplicationTable.NAME);
+ conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
conn.tableOperations().create("foo");
Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -256,7 +255,7 @@ public class ReplicationOperationsImplTest {
String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
- BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
Mutation m = new Mutation(file1);
StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -314,7 +313,7 @@ public class ReplicationOperationsImplTest {
Assert.assertFalse(done.get());
// Remove the replication entries too
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
m = new Mutation(file1);
m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
bw.addMutation(m);
@@ -334,7 +333,7 @@ public class ReplicationOperationsImplTest {
@Test
public void laterCreatedLogsDontBlockExecution() throws Exception {
Connector conn = inst.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(ReplicationTable.NAME);
+ conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
conn.tableOperations().create("foo");
Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -342,7 +341,7 @@ public class ReplicationOperationsImplTest {
String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
- BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
Mutation m = new Mutation(file1);
StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
bw.addMutation(m);
@@ -396,7 +395,7 @@ public class ReplicationOperationsImplTest {
System.out.println(e.getKey());
}
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig());
m = new Mutation(file1);
m.putDelete(StatusSection.NAME, tableId1);
bw.addMutation(m);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
index 11edbb1..e7f10dc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
@@ -25,8 +25,12 @@ import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -36,7 +40,9 @@ import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.StatusFormatter;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
@@ -45,9 +51,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
-public class ReplicationTable extends org.apache.accumulo.core.client.replication.ReplicationTable {
+public class ReplicationTable {
private static final Logger log = LoggerFactory.getLogger(ReplicationTable.class);
+ public static final String NAME = ReplicationConstants.TABLE_NAME;
+
public static final String COMBINER_NAME = "statuscombiner";
public static final String STATUS_LG_NAME = StatusSection.NAME.toString();
@@ -188,4 +196,32 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
return true;
}
+
+ public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
+ return conn.createScanner(NAME, auths);
+ }
+
+ public static Scanner getScanner(Connector conn) throws TableNotFoundException {
+ return getScanner(conn, new Authorizations());
+ }
+
+ public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
+ return getBatchWriter(conn, new BatchWriterConfig());
+ }
+
+ public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
+ return conn.createBatchWriter(NAME, config);
+ }
+
+ public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
+ return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
+ }
+
+ public static boolean exists(Connector conn) {
+ return exists(conn.tableOperations());
+ }
+
+ public static boolean exists(TableOperations tops) {
+ return tops.exists(NAME);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index 5bc2488..c76c19f 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -51,7 +53,9 @@ public class StatusCombinerTest {
key = new Key();
combiner = new StatusCombiner();
builder = Status.newBuilder();
- combiner.init(new DevNull(), ImmutableMap.of(Combiner.COLUMNS_OPTION, StatusSection.NAME.toString()), new IteratorEnvironment() {
+ IteratorSetting cfg = new IteratorSetting(50, StatusCombiner.class);
+ Combiner.setColumns(cfg, Collections.singletonList(new Column(StatusSection.NAME)));
+ combiner.init(new DevNull(), cfg.getOptions(), new IteratorEnvironment() {
public AccumuloConfiguration getConfig() {
return null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a4cb95d/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 69c5b55..2fcd67d 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.client.replication.ReplicationTable;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -87,7 +86,7 @@ public class ReplicationServlet extends BasicServlet {
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
TableOperations tops = conn.tableOperations();
- if (!tops.exists(ReplicationTable.NAME)) {
+ if (!tops.exists(ReplicationConstants.TABLE_NAME)) {
banner(sb, "", "Replication table does not yet exist");
return;
}
@@ -155,7 +154,7 @@ public class ReplicationServlet extends BasicServlet {
}
// Read over the queued work
- BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+ BatchScanner bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 4);
bs.setRanges(Collections.singleton(new Range()));
WorkSection.limit(bs);
try {
@@ -224,7 +223,7 @@ public class ReplicationServlet extends BasicServlet {
String path = null;
if (null != data) {
path = new String(data);
- Scanner s = ReplicationTable.getScanner(conn);
+ Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY);
s.setRange(Range.exact(path));
s.fetchColumn(WorkSection.NAME, target.toText());