You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:36 UTC
[02/14] accumulo git commit: ACCUMULO-3920 Convert more tests from
mock
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
new file mode 100644
index 0000000..fb702a2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.master;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.MasterState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class MergeStateIT extends ConfigurableMacBase {
+
+ private static class MockCurrentState implements CurrentState {
+
+ TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
+ MergeInfo mergeInfo;
+
+ MockCurrentState(MergeInfo info) {
+ this.mergeInfo = info;
+ }
+
+ @Override
+ public Set<String> onlineTables() {
+ return Collections.singleton("t");
+ }
+
+ @Override
+ public Set<TServerInstance> onlineTabletServers() {
+ return Collections.singleton(someTServer);
+ }
+
+ @Override
+ public Collection<MergeInfo> merges() {
+ return Collections.singleton(mergeInfo);
+ }
+
+ @Override
+ public Collection<KeyExtent> migrations() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public MasterState getMasterState() {
+ return MasterState.NORMAL;
+ }
+
+ @Override
+ public Set<TServerInstance> shutdownServers() {
+ return Collections.emptySet();
+ }
+ }
+
+ private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+ bw.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+ Connector connector = getConnector();
+ EasyMock.expect(context.getConnector()).andReturn(connector).anyTimes();
+ EasyMock.replay(context);
+ connector.securityOperations().grantTablePermission(connector.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+ BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ // Create a fake METADATA table with these splits
+ String splits[] = {"a", "e", "j", "o", "t", "z"};
+ // create metadata for a table "t" with the splits above
+ Text tableId = new Text("t");
+ Text pr = null;
+ for (String s : splits) {
+ Text split = new Text(s);
+ Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
+ prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
+ bw.addMutation(prevRow);
+ pr = split;
+ }
+ // Add the default tablet
+ Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
+ defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+ bw.addMutation(defaultTablet);
+ bw.close();
+
+ // Read out the TabletLocationStates
+ MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
+
+ // Verify the tablet state: hosted, and count
+ MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state);
+ int count = 0;
+ for (TabletLocationState tss : metaDataStateStore) {
+ if (tss != null)
+ count++;
+ }
+ Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state
+
+ // Create the hole
+ // Split the tablet at one end of the range
+ Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
+ update(connector, m);
+
+ // do the state check
+ MergeStats stats = scan(state, metaDataStateStore);
+ MergeState newState = stats.nextMergeState(connector, state);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+ // unassign the tablets
+ BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig());
+ deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ deleter.setRanges(Collections.singletonList(new Range()));
+ deleter.delete();
+
+ // now we should be ready to merge but, we have inconsistent metadata
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // finish the split
+ KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
+ m = tablet.getPrevRowUpdateMutation();
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+ update(connector, m);
+ metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+
+ // onos... there's a new tablet online
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
+
+ // chop it
+ m = tablet.getPrevRowUpdateMutation();
+ ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
+ update(connector, m);
+
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+ // take it offline
+ m = tablet.getPrevRowUpdateMutation();
+ Collection<Collection<String>> walogs = Collections.emptyList();
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
+
+ // now we can split
+ stats = scan(state, metaDataStateStore);
+ Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
+
+ }
+
+ private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
+ MergeStats stats = new MergeStats(state.mergeInfo);
+ stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
+ for (TabletLocationState tss : metaDataStateStore) {
+ stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
+ }
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
new file mode 100644
index 0000000..5519013
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
@@ -0,0 +1,173 @@
+package org.apache.accumulo.test.replication;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.FinishedWorkUpdater;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class FinishedWorkUpdaterIT extends ConfigurableMacBase {
+
+ private Connector conn;
+ private FinishedWorkUpdater updater;
+
+ @Before
+ public void configureUpdater() throws Exception {
+ conn = getConnector();
+ updater = new FinishedWorkUpdater(conn);
+ }
+
+ @Test
+ public void offlineReplicationTableFailsGracefully() {
+ updater.run();
+ }
+
+ @Test
+ public void recordsWithProgressUpdateBothTables() throws Exception {
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+
+ String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
+ ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
+
+ // Create a single work record for a file to some peer
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ Mutation m = new Mutation(file);
+ WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+ bw.close();
+
+ updater.run();
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ s.setRange(Range.exact(file));
+ StatusSection.limit(s);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+ Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+ Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId());
+
+ // We should only rely on the correct begin attribute being returned
+ Status actual = Status.parseFrom(entry.getValue().get());
+ Assert.assertEquals(stat.getBegin(), actual.getBegin());
+ }
+
+ @Test
+ public void chooseMinimumBeginOffset() throws Exception {
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+
+ String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ // @formatter:off
+ Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
+ stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
+ stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
+ ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
+ target2 = new ReplicationTarget("peer2", "table2", "1"),
+ target3 = new ReplicationTarget("peer3", "table3", "1");
+ // @formatter:on
+
+ // Create a single work record for a file to some peer
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ Mutation m = new Mutation(file);
+ WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
+ WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
+ WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
+ bw.addMutation(m);
+ bw.close();
+
+ updater.run();
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ s.setRange(Range.exact(file));
+ StatusSection.limit(s);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+ Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+ Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
+
+ // We should only rely on the correct begin attribute being returned
+ Status actual = Status.parseFrom(entry.getValue().get());
+ Assert.assertEquals(1, actual.getBegin());
+ }
+
+ @Test
+ public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+
+ String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ // @formatter:off
+ Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
+ stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
+ stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
+ ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
+ target2 = new ReplicationTarget("peer2", "table2", "1"),
+ target3 = new ReplicationTarget("peer3", "table3", "1");
+ // @formatter:on
+
+ // Create a single work record for a file to some peer
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ Mutation m = new Mutation(file);
+ WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
+ WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
+ WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
+ bw.addMutation(m);
+ bw.close();
+
+ updater.run();
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ s.setRange(Range.exact(file));
+ StatusSection.limit(s);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+ Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+ Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
+
+ // We should only rely on the correct begin attribute being returned
+ Status actual = Status.parseFrom(entry.getValue().get());
+ Assert.assertEquals(1, actual.getBegin());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
new file mode 100644
index 0000000..df1f64f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.RemoveCompleteReplicationRecords;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class RemoveCompleteReplicationRecordsIT extends ConfigurableMacBase {
+
+ private MockRemoveCompleteReplicationRecords rcrr;
+ private Connector conn;
+
+ private static class MockRemoveCompleteReplicationRecords extends RemoveCompleteReplicationRecords {
+
+ public MockRemoveCompleteReplicationRecords(Connector conn) {
+ super(conn);
+ }
+
+ @Override
+ public long removeCompleteRecords(Connector conn, BatchScanner bs, BatchWriter bw) {
+ return super.removeCompleteRecords(conn, bs, bw);
+ }
+
+ }
+
+ @Before
+ public void initialize() throws Exception {
+ conn = getConnector();
+ rcrr = new MockRemoveCompleteReplicationRecords(conn);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+ }
+
+ @Test
+ public void notYetReplicationRecordsIgnored() throws Exception {
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ int numRecords = 3;
+ for (int i = 0; i < numRecords; i++) {
+ String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(file);
+ StatusSection.add(m, new Text(Integer.toString(i)), StatusUtil.openWithUnknownLengthValue());
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+ BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+ bs.addScanIterator(cfg);
+ bw = EasyMock.createMock(BatchWriter.class);
+
+ EasyMock.replay(bw);
+
+ rcrr.removeCompleteRecords(conn, bs, bw);
+ bs.close();
+
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+ }
+
+ @Test
+ public void partiallyReplicatedRecordsIgnored() throws Exception {
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ int numRecords = 3;
+ Status.Builder builder = Status.newBuilder();
+ builder.setClosed(false);
+ builder.setEnd(10000);
+ builder.setInfiniteEnd(false);
+ for (int i = 0; i < numRecords; i++) {
+ String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(file);
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+ BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+ bs.addScanIterator(cfg);
+ bw = EasyMock.createMock(BatchWriter.class);
+
+ EasyMock.replay(bw);
+
+ // We don't remove any records, so we can just pass in a fake BW for both
+ rcrr.removeCompleteRecords(conn, bs, bw);
+ bs.close();
+
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+ }
+
+ @Test
+ public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception {
+ BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+ int numRecords = 3;
+
+ Status.Builder builder = Status.newBuilder();
+ builder.setClosed(false);
+ builder.setEnd(10000);
+ builder.setInfiniteEnd(false);
+
+ // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+ for (int i = 0; i < numRecords; i++) {
+ String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(file);
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+ replBw.addMutation(m);
+ }
+
+ // Add two records that we can delete
+ String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(fileToRemove);
+ StatusSection.add(m, new Text("5"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
+ replBw.addMutation(m);
+
+ numRecords++;
+
+ fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ m = new Mutation(fileToRemove);
+ StatusSection.add(m, new Text("6"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
+ replBw.addMutation(m);
+
+ numRecords++;
+
+ replBw.flush();
+
+ // Make sure that we have the expected number of records in both tables
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+ // We should not remove any records because they're missing closed status
+ BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+ bs.addScanIterator(cfg);
+
+ try {
+ Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
+ } finally {
+ bs.close();
+ replBw.close();
+ }
+ }
+
+ @Test
+ public void replicatedClosedRowsAreRemoved() throws Exception {
+ BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+ int numRecords = 3;
+
+ Status.Builder builder = Status.newBuilder();
+ builder.setClosed(false);
+ builder.setEnd(10000);
+ builder.setInfiniteEnd(false);
+
+ long time = System.currentTimeMillis();
+ // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+ for (int i = 0; i < numRecords; i++) {
+ builder.setCreatedTime(time++);
+ String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(file);
+ Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build());
+ StatusSection.add(m, new Text(Integer.toString(i)), v);
+ replBw.addMutation(m);
+ m = OrderSection.createMutation(file, time);
+ OrderSection.add(m, new Text(Integer.toString(i)), v);
+ replBw.addMutation(m);
+ }
+
+ Set<String> filesToRemove = new HashSet<>();
+ // We created two mutations for each file
+ numRecords *= 2;
+ int finalNumRecords = numRecords;
+
+ // Add two records that we can delete
+ String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ filesToRemove.add(fileToRemove);
+ Mutation m = new Mutation(fileToRemove);
+ ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
+ Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
+ StatusSection.add(m, new Text("5"), value);
+ WorkSection.add(m, target.toText(), value);
+ replBw.addMutation(m);
+
+ m = OrderSection.createMutation(fileToRemove, time);
+ OrderSection.add(m, new Text("5"), value);
+ replBw.addMutation(m);
+ time++;
+
+ numRecords += 3;
+
+ fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ filesToRemove.add(fileToRemove);
+ m = new Mutation(fileToRemove);
+ value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
+ target = new ReplicationTarget("peer1", "6", "6");
+ StatusSection.add(m, new Text("6"), value);
+ WorkSection.add(m, target.toText(), value);
+ replBw.addMutation(m);
+
+ m = OrderSection.createMutation(fileToRemove, time);
+ OrderSection.add(m, new Text("6"), value);
+ replBw.addMutation(m);
+ time++;
+
+ numRecords += 3;
+
+ replBw.flush();
+
+ // Make sure that we have the expected number of records in both tables
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+ // We should remove the two fully completed records we inserted
+ BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ StatusSection.limit(bs);
+ WorkSection.limit(bs);
+ IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+ bs.addScanIterator(cfg);
+
+ try {
+ Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw));
+ } finally {
+ bs.close();
+ replBw.close();
+ }
+
+ int actualRecords = 0;
+ for (Entry<Key,Value> entry : ReplicationTable.getScanner(conn)) {
+ Assert.assertFalse(filesToRemove.contains(entry.getKey().getRow().toString()));
+ actualRecords++;
+ }
+
+ Assert.assertEquals(finalNumRecords, actualRecords);
+ }
+
+ @Test
+ public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception {
+ BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+ int numRecords = 3;
+
+ Status.Builder builder = Status.newBuilder();
+ builder.setClosed(false);
+ builder.setEnd(10000);
+ builder.setInfiniteEnd(false);
+
+ // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+ for (int i = 0; i < numRecords; i++) {
+ String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(file);
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+ replBw.addMutation(m);
+ }
+
+ // Add two records that we can delete
+ String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ Mutation m = new Mutation(fileToRemove);
+ ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
+ Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build());
+ StatusSection.add(m, new Text("5"), value);
+ WorkSection.add(m, target.toText(), value);
+ target = new ReplicationTarget("peer2", "5", "5");
+ WorkSection.add(m, target.toText(), value);
+ target = new ReplicationTarget("peer3", "5", "5");
+ WorkSection.add(m, target.toText(), ProtobufUtil.toValue(builder.setClosed(false).build()));
+ replBw.addMutation(m);
+
+ numRecords += 4;
+
+ replBw.flush();
+
+ // Make sure that we have the expected number of records in both tables
+ Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+ // We should remove the two fully completed records we inserted
+ BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+ bs.setRanges(Collections.singleton(new Range()));
+ IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+ bs.addScanIterator(cfg);
+
+ try {
+ Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
+ } finally {
+ bs.close();
+ replBw.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
new file mode 100644
index 0000000..fa026d1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.MasterClientServiceHandler;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationOperationsImplIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplIT.class);
+
+ private Instance inst;
+ private Connector conn;
+
+ @Before
+ public void configureInstance() throws Exception {
+ conn = getConnector();
+ inst = conn.getInstance();
+ ReplicationTable.setOnline(conn);
+ conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ }
+
+ /**
+ * Spoof out the Master so we can call the implementation without starting a full instance.
+ */
+ private ReplicationOperationsImpl getReplicationOperations() throws Exception {
+ Master master = EasyMock.createMock(Master.class);
+ EasyMock.expect(master.getConnector()).andReturn(conn).anyTimes();
+ EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes();
+ EasyMock.replay(master);
+
+ final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) {
+ @Override
+ protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException {
+ try {
+ return conn.tableOperations().tableIdMap().get(tableName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken(ROOT_PASSWORD)), getClientConfig());
+ return new ReplicationOperationsImpl(context) {
+ @Override
+ protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set<String> wals)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ try {
+ return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ @Test
+ public void waitsUntilEntriesAreReplicated() throws Exception {
+ conn.tableOperations().create("foo");
+ Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+ String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+ Mutation m = new Mutation(file1);
+ StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ bw.close();
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+ bw.addMutation(m);
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+ m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+ bw.close();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean exception = new AtomicBoolean(false);
+ final ReplicationOperationsImpl roi = getReplicationOperations();
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ roi.drain("foo");
+ } catch (Exception e) {
+ log.error("Got error", e);
+ exception.set(true);
+ }
+ done.set(true);
+ }
+ });
+
+ t.start();
+
+ // With the records, we shouldn't be drained
+ Assert.assertFalse(done.get());
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.putDelete(ReplicationSection.COLF, tableId);
+ bw.addMutation(m);
+ bw.flush();
+
+ Assert.assertFalse(done.get());
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+ m.putDelete(ReplicationSection.COLF, tableId);
+ bw.addMutation(m);
+ bw.flush();
+ bw.close();
+
+ // Removing metadata entries doesn't change anything
+ Assert.assertFalse(done.get());
+
+ // Remove the replication entries too
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(file1);
+ m.putDelete(StatusSection.NAME, tableId);
+ bw.addMutation(m);
+ bw.flush();
+
+ Assert.assertFalse(done.get());
+
+ m = new Mutation(file2);
+ m.putDelete(StatusSection.NAME, tableId);
+ bw.addMutation(m);
+ bw.flush();
+
+ try {
+ t.join(5000);
+ } catch (InterruptedException e) {
+ Assert.fail("ReplicationOperations.drain did not complete");
+ }
+
+ // After both metadata and replication
+ Assert.assertTrue("Drain never finished", done.get());
+ Assert.assertFalse("Saw unexpectetd exception", exception.get());
+ }
+
+ @Test
+ public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
+ conn.tableOperations().create("foo");
+ conn.tableOperations().create("bar");
+
+ Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+ Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar"));
+
+ String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+ Mutation m = new Mutation(file1);
+ StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ bw.close();
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+
+ bw.addMutation(m);
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+ m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat));
+
+ bw.close();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean exception = new AtomicBoolean(false);
+
+ final ReplicationOperationsImpl roi = getReplicationOperations();
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ roi.drain("foo");
+ } catch (Exception e) {
+ log.error("Got error", e);
+ exception.set(true);
+ }
+ done.set(true);
+ }
+ });
+
+ t.start();
+
+ // With the records, we shouldn't be drained
+ Assert.assertFalse(done.get());
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.putDelete(ReplicationSection.COLF, tableId1);
+ bw.addMutation(m);
+ bw.flush();
+
+ // Removing metadata entries doesn't change anything
+ Assert.assertFalse(done.get());
+
+ // Remove the replication entries too
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(file1);
+ m.putDelete(StatusSection.NAME, tableId1);
+ bw.addMutation(m);
+ bw.flush();
+
+ try {
+ t.join(5000);
+ } catch (InterruptedException e) {
+ Assert.fail("ReplicationOperations.drain did not complete");
+ }
+
+ // After both metadata and replication
+ Assert.assertTrue("Drain never completed", done.get());
+ Assert.assertFalse("Saw unexpected exception", exception.get());
+ }
+
+ @Test
+ public void inprogressReplicationRecordsBlockExecution() throws Exception {
+ conn.tableOperations().create("foo");
+
+ Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+ String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+ Mutation m = new Mutation(file1);
+ StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+ bw.close();
+
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ m = new Mutation(logEntry.getRow());
+ m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+ bw.addMutation(m);
+
+ bw.close();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean exception = new AtomicBoolean(false);
+ final ReplicationOperationsImpl roi = getReplicationOperations();
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ roi.drain("foo");
+ } catch (Exception e) {
+ log.error("Got error", e);
+ exception.set(true);
+ }
+ done.set(true);
+ }
+ });
+
+ t.start();
+
+ // With the records, we shouldn't be drained
+ Assert.assertFalse(done.get());
+
+ Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build();
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus));
+ bw.addMutation(m);
+ bw.flush();
+
+ // Removing metadata entries doesn't change anything
+ Assert.assertFalse(done.get());
+
+ // Remove the replication entries too
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(file1);
+ m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
+ bw.addMutation(m);
+ bw.flush();
+
+ try {
+ t.join(5000);
+ } catch (InterruptedException e) {
+ Assert.fail("ReplicationOperations.drain did not complete");
+ }
+
+ // New records, but not fully replicated ones don't cause it to complete
+ Assert.assertFalse("Drain somehow finished", done.get());
+ Assert.assertFalse("Saw unexpected exception", exception.get());
+ }
+
+ @Test
+ public void laterCreatedLogsDontBlockExecution() throws Exception {
+ conn.tableOperations().create("foo");
+
+ Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+ String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+ Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ Mutation m = new Mutation(file1);
+ StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+ bw.close();
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+
+ bw.close();
+
+ log.info("Reading metadata first time");
+ for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ log.info("{}", e.getKey());
+ }
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean exception = new AtomicBoolean(false);
+ final ReplicationOperationsImpl roi = getReplicationOperations();
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ roi.drain("foo");
+ } catch (Exception e) {
+ log.error("Got error", e);
+ exception.set(true);
+ }
+ done.set(true);
+ }
+ });
+
+ t.start();
+
+ // We need to wait long enough for the table to read once
+ Thread.sleep(2000);
+
+ // Write another file, but also delete the old files
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID());
+ m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+ m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+ m.putDelete(ReplicationSection.COLF, tableId1);
+ bw.addMutation(m);
+ bw.close();
+
+ log.info("Reading metadata second time");
+ for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ log.info("{}", e.getKey());
+ }
+
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(file1);
+ m.putDelete(StatusSection.NAME, tableId1);
+ bw.addMutation(m);
+ bw.close();
+
+ try {
+ t.join(5000);
+ } catch (InterruptedException e) {
+ Assert.fail("ReplicationOperations.drain did not complete");
+ }
+
+ // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did)
+ Assert.assertTrue("Drain didn't finish", done.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
new file mode 100644
index 0000000..5668a67
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SequentialWorkAssignerIT extends ConfigurableMacBase {
+
+ private Connector conn;
+ private MockSequentialWorkAssigner assigner;
+
+ private static class MockSequentialWorkAssigner extends SequentialWorkAssigner {
+
+ public MockSequentialWorkAssigner(Connector conn) {
+ super(null, conn);
+ }
+
+ @Override
+ public void setConnector(Connector conn) {
+ super.setConnector(conn);
+ }
+
+ @Override
+ public void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+ super.setQueuedWork(queuedWork);
+ }
+
+ @Override
+ public void setWorkQueue(DistributedWorkQueue workQueue) {
+ super.setWorkQueue(workQueue);
+ }
+
+ @Override
+ public void setMaxQueueSize(int maxQueueSize) {
+ super.setMaxQueueSize(maxQueueSize);
+ }
+
+ @Override
+ public void createWork() {
+ super.createWork();
+ }
+
+ @Override
+ public void setZooCache(ZooCache zooCache) {
+ super.setZooCache(zooCache);
+ }
+
+ @Override
+ public void cleanupFinishedWork() {
+ super.cleanupFinishedWork();
+ }
+
+ }
+
+ @Before
+ public void init() throws Exception {
+ conn = getConnector();
+ assigner = new MockSequentialWorkAssigner(conn);
+ // grant ourselves write to the replication table
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+ }
+
+ @Test
+ public void createWorkForFilesInCorrectOrder() throws Exception {
+ ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+ Text serializedTarget = target.toText();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ // We want the name of file2 to sort before file1
+ String filename1 = "z_file1", filename2 = "a_file1";
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+ // File1 was closed before file2, however
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+ OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+ OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ // Make sure we expect the invocations in the correct order (accumulo is sorted)
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1);
+ expectLastCall().once();
+
+ // file2 is *not* queued because file1 must be replicated first
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+
+ Assert.assertEquals(1, queuedWork.size());
+ Assert.assertTrue(queuedWork.containsKey("cluster1"));
+ Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Assert.assertEquals(1, cluster1Work.size());
+ Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
+ }
+
+ @Test
+ public void workAcrossTablesHappensConcurrently() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+ Text serializedTarget1 = target1.toText();
+
+ ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2");
+ Text serializedTarget2 = target2.toText();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ // We want the name of file2 to sort before file1
+ String filename1 = "z_file1", filename2 = "a_file1";
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+ // File1 was closed before file2, however
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+ OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+ OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ // Make sure we expect the invocations in the correct order (accumulo is sorted)
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
+ expectLastCall().once();
+
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
+ expectLastCall().once();
+
+ // file2 is *not* queued because file1 must be replicated first
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+
+ Assert.assertEquals(1, queuedWork.size());
+ Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+ Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Assert.assertEquals(2, cluster1Work.size());
+ Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+ Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
+ }
+
+ @Test
+ public void workAcrossPeersHappensConcurrently() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+ Text serializedTarget1 = target1.toText();
+
+ ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1");
+ Text serializedTarget2 = target2.toText();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ // We want the name of file2 to sort before file1
+ String filename1 = "z_file1", filename2 = "a_file1";
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+ // File1 was closed before file2, however
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+ OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+ OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ // Make sure we expect the invocations in the correct order (accumulo is sorted)
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
+ expectLastCall().once();
+
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
+ expectLastCall().once();
+
+ // file2 is *not* queued because file1 must be replicated first
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+
+ Assert.assertEquals(2, queuedWork.size());
+ Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+ Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Assert.assertEquals(1, cluster1Work.size());
+ Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+ Map<String,String> cluster2Work = queuedWork.get("cluster2");
+ Assert.assertEquals(1, cluster2Work.size());
+ Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
+ }
+
+ @Test
+ public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
+ ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+ Text serializedTarget = target.toText();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ // We want the name of file2 to sort before file1
+ String filename1 = "z_file1", filename2 = "a_file1";
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+ // File1 was closed before file2, however
+ Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+ OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+ OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+ // Treat filename1 as we have already submitted it for replication
+ Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ Map<String,String> queuedWorkForCluster = new HashMap<>();
+ queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
+ queuedWork.put("cluster1", queuedWorkForCluster);
+
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ // Make sure we expect the invocations in the correct order (accumulo is sorted)
+ workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2);
+ expectLastCall().once();
+
+ // file2 is queued because we remove file1 because it's fully replicated
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+
+ Assert.assertEquals(1, queuedWork.size());
+ Assert.assertTrue(queuedWork.containsKey("cluster1"));
+ Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Assert.assertEquals(1, cluster1Work.size());
+ Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
new file mode 100644
index 0000000..cb34ed2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.StatusMaker;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class StatusMakerIT extends ConfigurableMacBase {
+
+ private Connector conn;
+
+ @Before
+ public void setupInstance() throws Exception {
+ conn = getConnector();
+ ReplicationTable.setOnline(conn);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ }
+
+ @Test
+ public void statusRecordsCreated() throws Exception {
+ String sourceTable = testName.getMethodName();
+ conn.tableOperations().create(sourceTable);
+ ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+ BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+ String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+ Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+ walPrefix + UUID.randomUUID());
+ Map<String,Integer> fileToTableId = new HashMap<>();
+
+ int index = 1;
+ long timeCreated = 0;
+ Map<String,Long> fileToTimeCreated = new HashMap<>();
+ for (String file : files) {
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
+ fileToTimeCreated.put(file, timeCreated);
+ bw.addMutation(m);
+ fileToTableId.put(file, index);
+ index++;
+ timeCreated++;
+ }
+
+ bw.close();
+
+ StatusMaker statusMaker = new StatusMaker(conn);
+ statusMaker.setSourceTableName(sourceTable);
+
+ statusMaker.run();
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Text file = new Text(), tableId = new Text();
+ for (Entry<Key,Value> entry : s) {
+ StatusSection.getFile(entry.getKey(), file);
+ StatusSection.getTableId(entry.getKey(), tableId);
+
+ Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString()));
+ Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString()));
+ timeCreated = fileToTimeCreated.get(file.toString());
+ Assert.assertNotNull(timeCreated);
+ Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get()));
+ }
+ }
+
+ @Test
+ public void openMessagesAreNotDeleted() throws Exception {
+ String sourceTable = testName.getMethodName();
+ conn.tableOperations().create(sourceTable);
+ ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+ BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+ String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+ Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+ walPrefix + UUID.randomUUID());
+ Map<String,Integer> fileToTableId = new HashMap<>();
+
+ int index = 1;
+ long timeCreated = 0;
+ for (String file : files) {
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
+ bw.addMutation(m);
+ fileToTableId.put(file, index);
+ index++;
+ timeCreated++;
+ }
+
+ bw.close();
+
+ StatusMaker statusMaker = new StatusMaker(conn);
+ statusMaker.setSourceTableName(sourceTable);
+
+ statusMaker.run();
+
+ Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Assert.assertEquals(files.size(), Iterables.size(s));
+ }
+
+ @Test
+ public void closedMessagesAreDeleted() throws Exception {
+ String sourceTable = testName.getMethodName();
+ conn.tableOperations().create(sourceTable);
+ ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+ BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+ String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+ Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+ walPrefix + UUID.randomUUID());
+ Map<String,Integer> fileToTableId = new HashMap<>();
+
+ Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build();
+
+ int index = 1;
+ for (String file : files) {
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(stat));
+ bw.addMutation(m);
+ fileToTableId.put(file, index);
+ index++;
+ }
+
+ bw.close();
+
+ StatusMaker statusMaker = new StatusMaker(conn);
+ statusMaker.setSourceTableName(sourceTable);
+
+ statusMaker.run();
+
+ Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ for (Entry<Key,Value> e : s) {
+ System.out.println(e.getKey().toStringNoTruncate() + " " + e.getValue());
+ }
+ s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ }
+
+ @Test
+ public void closedMessagesCreateOrderRecords() throws Exception {
+ String sourceTable = testName.getMethodName();
+ conn.tableOperations().create(sourceTable);
+ ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+ BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+ String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+ List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+ walPrefix + UUID.randomUUID());
+ Map<String,Integer> fileToTableId = new HashMap<>();
+
+ Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+ int index = 1;
+ long time = System.currentTimeMillis();
+ for (String file : files) {
+ statBuilder.setCreatedTime(time++);
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+ bw.addMutation(m);
+ fileToTableId.put(file, index);
+ index++;
+ }
+
+ bw.close();
+
+ StatusMaker statusMaker = new StatusMaker(conn);
+ statusMaker.setSourceTableName(sourceTable);
+
+ statusMaker.run();
+
+ Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ s = ReplicationTable.getScanner(conn);
+ OrderSection.limit(s);
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+ Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+ Iterator<String> expectedFiles = files.iterator();
+ Text buff = new Text();
+ while (expectedFiles.hasNext() && iter.hasNext()) {
+ String file = expectedFiles.next();
+ Entry<Key,Value> entry = iter.next();
+
+ Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+ OrderSection.getTableId(entry.getKey(), buff);
+ Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+ }
+
+ Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+ Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
new file mode 100644
index 0000000..048fa94
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.UnorderedWorkAssigner;
+import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnorderedWorkAssignerIT extends ConfigurableMacBase {
+
+ private Connector conn;
+ private MockUnorderedWorkAssigner assigner;
+
+ private static class MockUnorderedWorkAssigner extends UnorderedWorkAssigner {
+ public MockUnorderedWorkAssigner(Connector conn) {
+ super(null, conn);
+ }
+
+ @Override
+ protected void setQueuedWork(Set<String> queuedWork) {
+ super.setQueuedWork(queuedWork);
+ }
+
+ @Override
+ protected void setWorkQueue(DistributedWorkQueue workQueue) {
+ super.setWorkQueue(workQueue);
+ }
+
+ @Override
+ protected boolean queueWork(Path path, ReplicationTarget target) {
+ return super.queueWork(path, target);
+ }
+
+ @Override
+ protected void initializeQueuedWork() {
+ super.initializeQueuedWork();
+ }
+
+ @Override
+ protected Set<String> getQueuedWork() {
+ return super.getQueuedWork();
+ }
+
+ @Override
+ protected void setConnector(Connector conn) {
+ super.setConnector(conn);
+ }
+
+ @Override
+ protected void setMaxQueueSize(int maxQueueSize) {
+ super.setMaxQueueSize(maxQueueSize);
+ }
+
+ @Override
+ protected void createWork() {
+ super.createWork();
+ }
+
+ @Override
+ protected void setZooCache(ZooCache zooCache) {
+ super.setZooCache(zooCache);
+ }
+
+ @Override
+ protected void cleanupFinishedWork() {
+ super.cleanupFinishedWork();
+ }
+ }
+
+ @Before
+ public void init() throws Exception {
+ conn = getConnector();
+ assigner = new MockUnorderedWorkAssigner(conn);
+ ReplicationTable.setOnline(conn);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ }
+
+ @Test
+ public void createWorkForFilesNeedingIt() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+ Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+ String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier()
+ + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
+ + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR
+ + target2.getSourceTableId();
+
+ Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
+ Status status1 = builder.build();
+ builder.setCreatedTime(10l);
+ Status status2 = builder.build();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1));
+ bw.addMutation(m);
+ m = OrderSection.createMutation(file1, status1.getCreatedTime());
+ OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2));
+ bw.addMutation(m);
+ m = OrderSection.createMutation(file2, status2.getCreatedTime());
+ OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ HashSet<String> queuedWork = new HashSet<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ // Make sure we expect the invocations in the order they were created
+ String key = filename1 + "|" + keyTarget1;
+ workQueue.addWork(key, file1);
+ expectLastCall().once();
+
+ key = filename2 + "|" + keyTarget2;
+ workQueue.addWork(key, file2);
+ expectLastCall().once();
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+ }
+
+ @Test
+ public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+ Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ HashSet<String> queuedWork = new HashSet<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+ }
+
+ @Test
+ public void workNotReAdded() throws Exception {
+ Set<String> queuedWork = new HashSet<>();
+
+ assigner.setQueuedWork(queuedWork);
+
+ ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+ String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier()
+ + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
+
+ queuedWork.add("wal1|" + serializedTarget.toString());
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ String file1 = "/accumulo/wal/tserver+port/wal1";
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ assigner.setWorkQueue(workQueue);
+ assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+ replay(workQueue);
+
+ assigner.createWork();
+
+ verify(workQueue);
+ }
+}