You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:39 UTC
[05/14] accumulo git commit: ACCUMULO-3920 Convert more tests from
mock
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
deleted file mode 100644
index a8fe771..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchDeleter;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.master.thrift.MasterState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.master.state.MergeStats;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.CurrentState;
-import org.apache.accumulo.server.master.state.MergeInfo;
-import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.master.state.MetaDataStateStore;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.net.HostAndPort;
-
-public class TestMergeState {
-
- class MockCurrentState implements CurrentState {
-
- TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
- MergeInfo mergeInfo;
-
- MockCurrentState(MergeInfo info) {
- this.mergeInfo = info;
- }
-
- @Override
- public Set<String> onlineTables() {
- return Collections.singleton("t");
- }
-
- @Override
- public Set<TServerInstance> onlineTabletServers() {
- return Collections.singleton(someTServer);
- }
-
- @Override
- public Collection<MergeInfo> merges() {
- return Collections.singleton(mergeInfo);
- }
-
- @Override
- public Collection<KeyExtent> migrations() {
- return Collections.emptyList();
- }
-
- @Override
- public MasterState getMasterState() {
- return MasterState.NORMAL;
- }
-
- @Override
- public Set<TServerInstance> shutdownServers() {
- return Collections.emptySet();
- }
- }
-
- private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
- BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- bw.addMutation(m);
- bw.close();
- }
-
- @Rule
- public TestName test = new TestName();
-
- @Test
- public void test() throws Exception {
- Instance instance = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(instance));
- Connector connector = context.getConnector();
- BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-
- // Create a fake METADATA table with these splits
- String splits[] = {"a", "e", "j", "o", "t", "z"};
- // create metadata for a table "t" with the splits above
- Text tableId = new Text("t");
- Text pr = null;
- for (String s : splits) {
- Text split = new Text(s);
- Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
- prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
- ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
- bw.addMutation(prevRow);
- pr = split;
- }
- // Add the default tablet
- Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
- defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
- bw.addMutation(defaultTablet);
- bw.close();
-
- // Read out the TabletLocationStates
- MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
-
- // Verify the tablet state: hosted, and count
- MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state);
- int count = 0;
- for (TabletLocationState tss : metaDataStateStore) {
- if (tss != null)
- count++;
- }
- Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state
-
- // Create the hole
- // Split the tablet at one end of the range
- Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
- TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
- TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
- update(connector, m);
-
- // do the state check
- MergeStats stats = scan(state, metaDataStateStore);
- MergeState newState = stats.nextMergeState(connector, state);
- Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
-
- // unassign the tablets
- BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig());
- deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
- deleter.setRanges(Collections.singletonList(new Range()));
- deleter.delete();
-
- // now we should be ready to merge but, we have inconsistent metadata
- stats = scan(state, metaDataStateStore);
- Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
-
- // finish the split
- KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
- m = tablet.getPrevRowUpdateMutation();
- TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
- update(connector, m);
- metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
-
- // onos... there's a new tablet online
- stats = scan(state, metaDataStateStore);
- Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
-
- // chop it
- m = tablet.getPrevRowUpdateMutation();
- ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
- update(connector, m);
-
- stats = scan(state, metaDataStateStore);
- Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
-
- // take it offline
- m = tablet.getPrevRowUpdateMutation();
- Collection<Collection<String>> walogs = Collections.emptyList();
- metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
-
- // now we can split
- stats = scan(state, metaDataStateStore);
- Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
-
- }
-
- private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
- MergeStats stats = new MergeStats(state.mergeInfo);
- stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
- for (TabletLocationState tss : metaDataStateStore) {
- stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
- }
- return stats;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
deleted file mode 100644
index 864a79d..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-
-public class FinishedWorkUpdaterTest {
-
- @Rule
- public TestName test = new TestName();
-
- private Connector conn;
- private FinishedWorkUpdater updater;
-
- @Before
- public void setup() throws Exception {
- Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- conn = inst.getConnector("root", new PasswordToken(""));
- updater = new FinishedWorkUpdater(conn);
- }
-
- @Test
- public void offlineReplicationTableFailsGracefully() {
- updater.run();
- }
-
- @Test
- public void recordsWithProgressUpdateBothTables() throws Exception {
- String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
- Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
- ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
-
- // Create a single work record for a file to some peer
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- Mutation m = new Mutation(file);
- WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat));
- bw.addMutation(m);
- bw.close();
-
- updater.run();
-
- Scanner s = ReplicationTable.getScanner(conn);
- s.setRange(Range.exact(file));
- StatusSection.limit(s);
- Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
- Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
- Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId());
-
- // We should only rely on the correct begin attribute being returned
- Status actual = Status.parseFrom(entry.getValue().get());
- Assert.assertEquals(stat.getBegin(), actual.getBegin());
- }
-
- @Test
- public void chooseMinimumBeginOffset() throws Exception {
- String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
- // @formatter:off
- Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
- stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
- stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
- ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
- target2 = new ReplicationTarget("peer2", "table2", "1"),
- target3 = new ReplicationTarget("peer3", "table3", "1");
- // @formatter:on
-
- // Create a single work record for a file to some peer
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- Mutation m = new Mutation(file);
- WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
- WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
- WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
- bw.addMutation(m);
- bw.close();
-
- updater.run();
-
- Scanner s = ReplicationTable.getScanner(conn);
- s.setRange(Range.exact(file));
- StatusSection.limit(s);
- Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
- Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
- Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
-
- // We should only rely on the correct begin attribute being returned
- Status actual = Status.parseFrom(entry.getValue().get());
- Assert.assertEquals(1, actual.getBegin());
- }
-
- @Test
- public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
- String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
- // @formatter:off
- Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
- stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
- stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
- ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
- target2 = new ReplicationTarget("peer2", "table2", "1"),
- target3 = new ReplicationTarget("peer3", "table3", "1");
- // @formatter:on
-
- // Create a single work record for a file to some peer
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- Mutation m = new Mutation(file);
- WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
- WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
- WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
- bw.addMutation(m);
- bw.close();
-
- updater.run();
-
- Scanner s = ReplicationTable.getScanner(conn);
- s.setRange(Range.exact(file));
- StatusSection.limit(s);
- Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
- Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
- Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
-
- // We should only rely on the correct begin attribute being returned
- Status actual = Status.parseFrom(entry.getValue().get());
- Assert.assertEquals(1, actual.getBegin());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
deleted file mode 100644
index 2555077..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-
-public class RemoveCompleteReplicationRecordsTest {
-
- private RemoveCompleteReplicationRecords rcrr;
- private Instance inst;
- private Connector conn;
-
- @Rule
- public TestName test = new TestName();
-
- @Before
- public void initialize() throws Exception {
- inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- conn = inst.getConnector("root", new PasswordToken(""));
- rcrr = new RemoveCompleteReplicationRecords(conn);
- }
-
- @Test
- public void notYetReplicationRecordsIgnored() throws Exception {
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- int numRecords = 3;
- for (int i = 0; i < numRecords; i++) {
- String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), StatusUtil.openWithUnknownLengthValue());
- bw.addMutation(m);
- }
-
- bw.close();
-
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
- BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
- bs.setRanges(Collections.singleton(new Range()));
- IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
- bs.addScanIterator(cfg);
- bw = EasyMock.createMock(BatchWriter.class);
-
- EasyMock.replay(bw);
-
- rcrr.removeCompleteRecords(conn, bs, bw);
- bs.close();
-
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
- }
-
- @Test
- public void partiallyReplicatedRecordsIgnored() throws Exception {
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- int numRecords = 3;
- Status.Builder builder = Status.newBuilder();
- builder.setClosed(false);
- builder.setEnd(10000);
- builder.setInfiniteEnd(false);
- for (int i = 0; i < numRecords; i++) {
- String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
- bw.addMutation(m);
- }
-
- bw.close();
-
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
- BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
- bs.setRanges(Collections.singleton(new Range()));
- IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
- bs.addScanIterator(cfg);
- bw = EasyMock.createMock(BatchWriter.class);
-
- EasyMock.replay(bw);
-
- // We don't remove any records, so we can just pass in a fake BW for both
- rcrr.removeCompleteRecords(conn, bs, bw);
- bs.close();
-
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
- }
-
- @Test
- public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception {
- BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
- int numRecords = 3;
-
- Status.Builder builder = Status.newBuilder();
- builder.setClosed(false);
- builder.setEnd(10000);
- builder.setInfiniteEnd(false);
-
- // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
- for (int i = 0; i < numRecords; i++) {
- String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
- replBw.addMutation(m);
- }
-
- // Add two records that we can delete
- String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(fileToRemove);
- StatusSection.add(m, new Text("5"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
- replBw.addMutation(m);
-
- numRecords++;
-
- fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- m = new Mutation(fileToRemove);
- StatusSection.add(m, new Text("6"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
- replBw.addMutation(m);
-
- numRecords++;
-
- replBw.flush();
-
- // Make sure that we have the expected number of records in both tables
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
- // We should not remove any records because they're missing closed status
- BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
- bs.setRanges(Collections.singleton(new Range()));
- IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
- bs.addScanIterator(cfg);
-
- try {
- Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
- } finally {
- bs.close();
- replBw.close();
- }
- }
-
- @Test
- public void replicatedClosedRowsAreRemoved() throws Exception {
- BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
- int numRecords = 3;
-
- Status.Builder builder = Status.newBuilder();
- builder.setClosed(false);
- builder.setEnd(10000);
- builder.setInfiniteEnd(false);
-
- long time = System.currentTimeMillis();
- // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
- for (int i = 0; i < numRecords; i++) {
- builder.setCreatedTime(time++);
- String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(file);
- Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build());
- StatusSection.add(m, new Text(Integer.toString(i)), v);
- replBw.addMutation(m);
- m = OrderSection.createMutation(file, time);
- OrderSection.add(m, new Text(Integer.toString(i)), v);
- replBw.addMutation(m);
- }
-
- Set<String> filesToRemove = new HashSet<>();
- // We created two mutations for each file
- numRecords *= 2;
- int finalNumRecords = numRecords;
-
- // Add two records that we can delete
- String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- filesToRemove.add(fileToRemove);
- Mutation m = new Mutation(fileToRemove);
- ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
- Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
- StatusSection.add(m, new Text("5"), value);
- WorkSection.add(m, target.toText(), value);
- replBw.addMutation(m);
-
- m = OrderSection.createMutation(fileToRemove, time);
- OrderSection.add(m, new Text("5"), value);
- replBw.addMutation(m);
- time++;
-
- numRecords += 3;
-
- fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- filesToRemove.add(fileToRemove);
- m = new Mutation(fileToRemove);
- value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
- target = new ReplicationTarget("peer1", "6", "6");
- StatusSection.add(m, new Text("6"), value);
- WorkSection.add(m, target.toText(), value);
- replBw.addMutation(m);
-
- m = OrderSection.createMutation(fileToRemove, time);
- OrderSection.add(m, new Text("6"), value);
- replBw.addMutation(m);
- time++;
-
- numRecords += 3;
-
- replBw.flush();
-
- // Make sure that we have the expected number of records in both tables
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
- // We should remove the two fully completed records we inserted
- BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
- bs.setRanges(Collections.singleton(new Range()));
- StatusSection.limit(bs);
- WorkSection.limit(bs);
- IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
- bs.addScanIterator(cfg);
-
- try {
- Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw));
- } finally {
- bs.close();
- replBw.close();
- }
-
- int actualRecords = 0;
- for (Entry<Key,Value> entry : ReplicationTable.getScanner(conn)) {
- Assert.assertFalse(filesToRemove.contains(entry.getKey().getRow().toString()));
- actualRecords++;
- }
-
- Assert.assertEquals(finalNumRecords, actualRecords);
- }
-
- @Test
- public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception {
- BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
- int numRecords = 3;
-
- Status.Builder builder = Status.newBuilder();
- builder.setClosed(false);
- builder.setEnd(10000);
- builder.setInfiniteEnd(false);
-
- // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
- for (int i = 0; i < numRecords; i++) {
- String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
- replBw.addMutation(m);
- }
-
- // Add two records that we can delete
- String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
- Mutation m = new Mutation(fileToRemove);
- ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
- Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build());
- StatusSection.add(m, new Text("5"), value);
- WorkSection.add(m, target.toText(), value);
- target = new ReplicationTarget("peer2", "5", "5");
- WorkSection.add(m, target.toText(), value);
- target = new ReplicationTarget("peer3", "5", "5");
- WorkSection.add(m, target.toText(), ProtobufUtil.toValue(builder.setClosed(false).build()));
- replBw.addMutation(m);
-
- numRecords += 4;
-
- replBw.flush();
-
- // Make sure that we have the expected number of records in both tables
- Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
- // We should remove the two fully completed records we inserted
- BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
- bs.setRanges(Collections.singleton(new Range()));
- IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
- bs.addScanIterator(cfg);
-
- try {
- Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
- } finally {
- bs.close();
- replBw.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index d4675db..45fe959 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -18,260 +18,35 @@ package org.apache.accumulo.master.replication;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TestName;
public class SequentialWorkAssignerTest {
- @Rule
- public TestName test = new TestName();
-
- private AccumuloConfiguration conf;
private Connector conn;
- private Connector mockConn;
private SequentialWorkAssigner assigner;
@Before
public void init() throws Exception {
- conf = createMock(AccumuloConfiguration.class);
+ AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
conn = createMock(Connector.class);
assigner = new SequentialWorkAssigner(conf, conn);
-
- Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- mockConn = inst.getConnector("root", new PasswordToken(""));
- // Set the connector
- assigner.setConnector(mockConn);
- // grant ourselves write to the replication table
- mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
- }
-
- @Test
- public void createWorkForFilesInCorrectOrder() throws Exception {
- ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
- Text serializedTarget = target.toText();
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- // We want the name of file2 to sort before file1
- String filename1 = "z_file1", filename2 = "a_file1";
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
- // File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file1, stat1.getCreatedTime());
- OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file2, stat2.getCreatedTime());
- OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
- assigner.setQueuedWork(queuedWork);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- // Make sure we expect the invocations in the correct order (accumulo is sorted)
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1);
- expectLastCall().once();
-
- // file2 is *not* queued because file1 must be replicated first
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
-
- Assert.assertEquals(1, queuedWork.size());
- Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
- Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
- }
-
- @Test
- public void workAcrossTablesHappensConcurrently() throws Exception {
- ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
- Text serializedTarget1 = target1.toText();
-
- ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2");
- Text serializedTarget2 = target2.toText();
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- // We want the name of file2 to sort before file1
- String filename1 = "z_file1", filename2 = "a_file1";
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
- // File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file1, stat1.getCreatedTime());
- OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file2, stat2.getCreatedTime());
- OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
- assigner.setQueuedWork(queuedWork);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- // Make sure we expect the invocations in the correct order (accumulo is sorted)
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
- expectLastCall().once();
-
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
- expectLastCall().once();
-
- // file2 is *not* queued because file1 must be replicated first
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
-
- Assert.assertEquals(1, queuedWork.size());
- Assert.assertTrue(queuedWork.containsKey("cluster1"));
-
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
- Assert.assertEquals(2, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
-
- Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
- }
-
- @Test
- public void workAcrossPeersHappensConcurrently() throws Exception {
- ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
- Text serializedTarget1 = target1.toText();
-
- ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1");
- Text serializedTarget2 = target2.toText();
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- // We want the name of file2 to sort before file1
- String filename1 = "z_file1", filename2 = "a_file1";
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
- // File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file1, stat1.getCreatedTime());
- OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file2, stat2.getCreatedTime());
- OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
- assigner.setQueuedWork(queuedWork);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- // Make sure we expect the invocations in the correct order (accumulo is sorted)
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
- expectLastCall().once();
-
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
- expectLastCall().once();
-
- // file2 is *not* queued because file1 must be replicated first
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
-
- Assert.assertEquals(2, queuedWork.size());
- Assert.assertTrue(queuedWork.containsKey("cluster1"));
-
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
- Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
-
- Map<String,String> cluster2Work = queuedWork.get("cluster2");
- Assert.assertEquals(1, cluster2Work.size());
- Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
}
@Test
@@ -315,69 +90,4 @@ public class SequentialWorkAssignerTest {
Assert.assertEquals(1, cluster1Work.size());
Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2"));
}
-
- @Test
- public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
- ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
- Text serializedTarget = target.toText();
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- // We want the name of file2 to sort before file1
- String filename1 = "z_file1", filename2 = "a_file1";
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
- // File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file1, stat1.getCreatedTime());
- OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
-
- m = OrderSection.createMutation(file2, stat2.getCreatedTime());
- OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-
- // Treat filename1 as we have already submitted it for replication
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
- Map<String,String> queuedWorkForCluster = new HashMap<>();
- queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
- queuedWork.put("cluster1", queuedWorkForCluster);
-
- assigner.setQueuedWork(queuedWork);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- // Make sure we expect the invocations in the correct order (accumulo is sorted)
- workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2);
- expectLastCall().once();
-
- // file2 is queued because we remove file1 because it's fully replicated
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
-
- Assert.assertEquals(1, queuedWork.size());
- Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
- Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
deleted file mode 100644
index 11be4fb..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.Credentials;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-public class StatusMakerTest {
-
- @Rule
- public TestName test = new TestName();
-
- private Connector conn;
-
- @Before
- public void setupInstance() throws Exception {
- Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- Credentials creds = new Credentials("root", new PasswordToken(""));
- conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
- }
-
- @Test
- public void statusRecordsCreated() throws Exception {
- String sourceTable = "source";
- conn.tableOperations().create(sourceTable);
- ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
- BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
- String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
- Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
- walPrefix + UUID.randomUUID());
- Map<String,Integer> fileToTableId = new HashMap<>();
-
- int index = 1;
- long timeCreated = 0;
- Map<String,Long> fileToTimeCreated = new HashMap<>();
- for (String file : files) {
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
- fileToTimeCreated.put(file, timeCreated);
- bw.addMutation(m);
- fileToTableId.put(file, index);
- index++;
- timeCreated++;
- }
-
- bw.close();
-
- StatusMaker statusMaker = new StatusMaker(conn);
- statusMaker.setSourceTableName(sourceTable);
-
- statusMaker.run();
-
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Text file = new Text(), tableId = new Text();
- for (Entry<Key,Value> entry : s) {
- StatusSection.getFile(entry.getKey(), file);
- StatusSection.getTableId(entry.getKey(), tableId);
-
- Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString()));
- Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString()));
- timeCreated = fileToTimeCreated.get(file.toString());
- Assert.assertNotNull(timeCreated);
- Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get()));
- }
- }
-
- @Test
- public void openMessagesAreNotDeleted() throws Exception {
- String sourceTable = "source";
- conn.tableOperations().create(sourceTable);
- ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
- BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
- String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
- Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
- walPrefix + UUID.randomUUID());
- Map<String,Integer> fileToTableId = new HashMap<>();
-
- int index = 1;
- long timeCreated = 0;
- for (String file : files) {
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
- bw.addMutation(m);
- fileToTableId.put(file, index);
- index++;
- timeCreated++;
- }
-
- bw.close();
-
- StatusMaker statusMaker = new StatusMaker(conn);
- statusMaker.setSourceTableName(sourceTable);
-
- statusMaker.run();
-
- Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- s.fetchColumnFamily(ReplicationSection.COLF);
- Assert.assertEquals(files.size(), Iterables.size(s));
- }
-
- @Test
- public void closedMessagesAreDeleted() throws Exception {
- String sourceTable = "source";
- conn.tableOperations().create(sourceTable);
- ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
- BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
- String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
- Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
- walPrefix + UUID.randomUUID());
- Map<String,Integer> fileToTableId = new HashMap<>();
-
- Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build();
-
- int index = 1;
- for (String file : files) {
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(stat));
- bw.addMutation(m);
- fileToTableId.put(file, index);
- index++;
- }
-
- bw.close();
-
- StatusMaker statusMaker = new StatusMaker(conn);
- statusMaker.setSourceTableName(sourceTable);
-
- statusMaker.run();
-
- Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- s.fetchColumnFamily(ReplicationSection.COLF);
- for (Entry<Key,Value> e : s) {
- System.out.println(e.getKey().toStringNoTruncate() + " " + e.getValue());
- }
- s = conn.createScanner(sourceTable, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- s.fetchColumnFamily(ReplicationSection.COLF);
- Assert.assertEquals(0, Iterables.size(s));
-
- }
-
- @Test
- public void closedMessagesCreateOrderRecords() throws Exception {
- String sourceTable = "source";
- conn.tableOperations().create(sourceTable);
- ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
- BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
- String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
- List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
- walPrefix + UUID.randomUUID());
- Map<String,Integer> fileToTableId = new HashMap<>();
-
- Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
-
- int index = 1;
- long time = System.currentTimeMillis();
- for (String file : files) {
- statBuilder.setCreatedTime(time++);
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
- bw.addMutation(m);
- fileToTableId.put(file, index);
- index++;
- }
-
- bw.close();
-
- StatusMaker statusMaker = new StatusMaker(conn);
- statusMaker.setSourceTableName(sourceTable);
-
- statusMaker.run();
-
- Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- s.fetchColumnFamily(ReplicationSection.COLF);
- Assert.assertEquals(0, Iterables.size(s));
-
- s = ReplicationTable.getScanner(conn);
- OrderSection.limit(s);
- Iterator<Entry<Key,Value>> iter = s.iterator();
- Assert.assertTrue("Found no order records in replication table", iter.hasNext());
-
- Iterator<String> expectedFiles = files.iterator();
- Text buff = new Text();
- while (expectedFiles.hasNext() && iter.hasNext()) {
- String file = expectedFiles.next();
- Entry<Key,Value> entry = iter.next();
-
- Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
- OrderSection.getTableId(entry.getKey(), buff);
- Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
- }
-
- Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
- Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index f4c53f0..a9af68b 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -30,50 +30,29 @@ import java.util.Set;
import java.util.UUID;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TestName;
public class UnorderedWorkAssignerTest {
- @Rule
- public TestName test = new TestName();
-
- private AccumuloConfiguration conf;
private Connector conn;
- private Connector mockConn;
private UnorderedWorkAssigner assigner;
@Before
public void init() throws Exception {
- conf = createMock(AccumuloConfiguration.class);
+ AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
conn = createMock(Connector.class);
assigner = new UnorderedWorkAssigner(conf, conn);
-
- Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
- mockConn = inst.getConnector("root", new PasswordToken(""));
}
@Test
@@ -122,106 +101,6 @@ public class UnorderedWorkAssignerTest {
}
@Test
- public void createWorkForFilesNeedingIt() throws Exception {
- ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
- Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
- String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier()
- + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
- + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR
- + target2.getSourceTableId();
-
- // Set the connector
- assigner.setConnector(mockConn);
-
- // grant ourselves write to the replication table
- mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
- Status status1 = builder.build();
- builder.setCreatedTime(10l);
- Status status2 = builder.build();
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1));
- bw.addMutation(m);
- m = OrderSection.createMutation(file1, status1.getCreatedTime());
- OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2));
- bw.addMutation(m);
- m = OrderSection.createMutation(file2, status2.getCreatedTime());
- OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- HashSet<String> queuedWork = new HashSet<>();
- assigner.setQueuedWork(queuedWork);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- // Make sure we expect the invocations in the order they were created
- String key = filename1 + "|" + keyTarget1;
- workQueue.addWork(key, file1);
- expectLastCall().once();
-
- key = filename2 + "|" + keyTarget2;
- workQueue.addWork(key, file2);
- expectLastCall().once();
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
- }
-
- @Test
- public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
- ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
- Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-
- // Set the connector
- assigner.setConnector(mockConn);
-
- // grant ourselves write to the replication table
- mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
- String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
- Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
- bw.addMutation(m);
-
- m = new Mutation(file2);
- WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- HashSet<String> queuedWork = new HashSet<>();
- assigner.setQueuedWork(queuedWork);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
- }
-
- @Test
public void workNotInZooKeeperIsCleanedUp() {
Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", "wal2"));
assigner.setQueuedWork(queuedWork);
@@ -243,41 +122,4 @@ public class UnorderedWorkAssignerTest {
Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty());
}
- @Test
- public void workNotReAdded() throws Exception {
- Set<String> queuedWork = new HashSet<>();
-
- assigner.setQueuedWork(queuedWork);
-
- ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
- String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier()
- + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
-
- queuedWork.add("wal1|" + serializedTarget.toString());
-
- // Set the connector
- assigner.setConnector(mockConn);
-
- // grant ourselves write to the replication table
- mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- // Create two mutations, both of which need replication work done
- BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
- String file1 = "/accumulo/wal/tserver+port/wal1";
- Mutation m = new Mutation(file1);
- WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
- bw.addMutation(m);
-
- bw.close();
-
- DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- assigner.setWorkQueue(workQueue);
- assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
- replay(workQueue);
-
- assigner.createWork();
-
- verify(workQueue);
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index d6d0b9a..ec849f4 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -16,185 +16,16 @@
*/
package org.apache.accumulo.master.replication;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
public class WorkMakerTest {
- private Instance instance;
- private Connector conn;
-
- @Rule
- public TestName name = new TestName();
- private AccumuloServerContext context;
-
- @Before
- public void createMockAccumulo() throws Exception {
- instance = new org.apache.accumulo.core.client.mock.MockInstance(name.getMethodName());
- context = new AccumuloServerContext(new ServerConfigurationFactory(instance));
- conn = context.getConnector();
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
- conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
- }
-
- @Test
- public void singleUnitSingleTarget() throws Exception {
- String table = name.getMethodName();
- conn.tableOperations().create(name.getMethodName());
- String tableId = conn.tableOperations().tableIdMap().get(table);
- String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
- // Create a status record for a file
- long timeCreated = System.currentTimeMillis();
- Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(timeCreated));
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- bw.addMutation(m);
- bw.flush();
-
- // Assert that we have one record in the status section
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Assert.assertEquals(1, Iterables.size(s));
-
- WorkMaker workMaker = new WorkMaker(context, conn);
-
- // Invoke the addWorkRecord method to create a Work record from the Status record earlier
- ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4", tableId);
- workMaker.setBatchWriter(bw);
- workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(timeCreated), ImmutableMap.of("remote_cluster_1", "4"), tableId);
-
- // Scan over just the WorkSection
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
-
- Entry<Key,Value> workEntry = Iterables.getOnlyElement(s);
- Key workKey = workEntry.getKey();
- ReplicationTarget actual = ReplicationTarget.from(workKey.getColumnQualifier());
-
- Assert.assertEquals(file, workKey.getRow().toString());
- Assert.assertEquals(WorkSection.NAME, workKey.getColumnFamily());
- Assert.assertEquals(expected, actual);
- Assert.assertEquals(workEntry.getValue(), StatusUtil.fileCreatedValue(timeCreated));
- }
-
- @Test
- public void singleUnitMultipleTargets() throws Exception {
- String table = name.getMethodName();
- conn.tableOperations().create(name.getMethodName());
-
- String tableId = conn.tableOperations().tableIdMap().get(table);
-
- String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
- Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
- bw.addMutation(m);
- bw.flush();
-
- // Assert that we have one record in the status section
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Assert.assertEquals(1, Iterables.size(s));
-
- WorkMaker workMaker = new WorkMaker(context, conn);
-
- Map<String,String> targetClusters = ImmutableMap.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8");
- Set<ReplicationTarget> expectedTargets = new HashSet<>();
- for (Entry<String,String> cluster : targetClusters.entrySet()) {
- expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId));
- }
- workMaker.setBatchWriter(bw);
- workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(System.currentTimeMillis()), targetClusters, tableId);
-
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
-
- Set<ReplicationTarget> actualTargets = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- Assert.assertEquals(file, entry.getKey().getRow().toString());
- Assert.assertEquals(WorkSection.NAME, entry.getKey().getColumnFamily());
-
- ReplicationTarget target = ReplicationTarget.from(entry.getKey().getColumnQualifier());
- actualTargets.add(target);
- }
-
- for (ReplicationTarget expected : expectedTargets) {
- Assert.assertTrue("Did not find expected target: " + expected, actualTargets.contains(expected));
- actualTargets.remove(expected);
- }
-
- Assert.assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty());
- }
-
- @Test
- public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception {
- String table = name.getMethodName();
- conn.tableOperations().create(name.getMethodName());
- String tableId = conn.tableOperations().tableIdMap().get(table);
- String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
- Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- BatchWriter bw = ReplicationTable.getBatchWriter(conn);
- bw.addMutation(m);
- bw.flush();
-
- // Assert that we have one record in the status section
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Assert.assertEquals(1, Iterables.size(s));
-
- WorkMaker workMaker = new WorkMaker(context, conn);
-
- conn.tableOperations().setProperty(ReplicationTable.NAME, Property.TABLE_REPLICATION_TARGET.getKey() + "remote_cluster_1", "4");
-
- workMaker.setBatchWriter(bw);
-
- // If we don't shortcircuit out, we should get an exception because ServerConfiguration.getTableConfiguration
- // won't work with MockAccumulo
- workMaker.run();
-
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
-
- Assert.assertEquals(0, Iterables.size(s));
- }
-
@Test
public void closedStatusRecordsStillMakeWork() throws Exception {
- WorkMaker workMaker = new WorkMaker(context, conn);
+ WorkMaker workMaker = new WorkMaker(null, null);
Assert.assertFalse(workMaker.shouldCreateWork(StatusUtil.fileCreated(System.currentTimeMillis())));
Assert.assertTrue(workMaker.shouldCreateWork(StatusUtil.ingestedUntil(1000)));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
----------------------------------------------------------------------
diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java b/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
deleted file mode 100644
index 02a9ee2..0000000
--- a/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.shell.commands;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.format.Formatter;
-import org.apache.accumulo.shell.Shell;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests the shell output with Formatters
- */
-public class FormatterCommandTest {
- ByteArrayOutputStream out = null;
- InputStream in = null;
-
- @Test
- public void test() throws IOException, AccumuloException, AccumuloSecurityException, TableExistsException, ClassNotFoundException {
- // Keep the Shell AUDIT log off the test output
- Logger.getLogger(Shell.class).setLevel(Level.WARN);
-
- File config = Files.createTempFile(null, null).toFile();
- config.deleteOnExit();
- final String[] args = new String[] {"--config-file", config.toString(), "--fake", "-u", "root", "-p", ""};
-
- final String[] commands = createCommands();
-
- in = org.apache.accumulo.shell.mock.MockShell.makeCommands(commands);
- out = new ByteArrayOutputStream();
-
- final Shell shell = new org.apache.accumulo.shell.mock.MockShell(in, out);
- assertTrue("Failed to configure shell without error", shell.config(args));
-
- // Can't call createtable in the shell with MockAccumulo
- shell.getConnector().tableOperations().create("test");
-
- try {
- shell.start();
- } catch (Exception e) {
- Assert.fail("Exception while running commands: " + e.getMessage());
- }
-
- shell.getReader().flush();
-
- final String[] output = new String(out.toByteArray()).split("\n\r");
-
- boolean formatterOn = false;
-
- final String[] expectedDefault = new String[] {"row cf:cq [] 1234abcd", "row cf1:cq1 [] 9876fedc", "row2 cf:cq [] 13579bdf",
- "row2 cf1:cq [] 2468ace"};
-
- final String[] expectedFormatted = new String[] {"row cf:cq [] 0x31 0x32 0x33 0x34 0x61 0x62 0x63 0x64",
- "row cf1:cq1 [] 0x39 0x38 0x37 0x36 0x66 0x65 0x64 0x63", "row2 cf:cq [] 0x31 0x33 0x35 0x37 0x39 0x62 0x64 0x66",
- "row2 cf1:cq [] 0x32 0x34 0x36 0x38 0x61 0x63 0x65"};
-
- int outputIndex = 0;
- while (outputIndex < output.length) {
- final String line = output[outputIndex];
-
- if (line.startsWith("root@mock-instance")) {
- if (line.contains("formatter")) {
- formatterOn = true;
- }
-
- outputIndex++;
- } else if (line.startsWith("row")) {
- int expectedIndex = 0;
- String[] comparisonData;
-
- // Pick the type of data we expect (formatted or default)
- if (formatterOn) {
- comparisonData = expectedFormatted;
- } else {
- comparisonData = expectedDefault;
- }
-
- // Ensure each output is what we expected
- while (expectedIndex + outputIndex < output.length && expectedIndex < expectedFormatted.length) {
- Assert.assertEquals(comparisonData[expectedIndex].trim(), output[expectedIndex + outputIndex].trim());
- expectedIndex++;
- }
-
- outputIndex += expectedIndex;
- }
- }
- }
-
- private String[] createCommands() {
- return new String[] {"table test", "insert row cf cq 1234abcd", "insert row cf1 cq1 9876fedc", "insert row2 cf cq 13579bdf", "insert row2 cf1 cq 2468ace",
- "scan", "formatter -t test -f org.apache.accumulo.core.util.shell.command.FormatterCommandTest$HexFormatter", "scan"};
- }
-
- /**
- * <p>
- * Simple <code>Formatter</code> that will convert each character in the Value from decimal to hexadecimal. Will automatically skip over characters in the
- * value which do not fall within the [0-9,a-f] range.
- * </p>
- *
- * <p>
- * Example: <code>'0'</code> will be displayed as <code>'0x30'</code>
- * </p>
- */
- public static class HexFormatter implements Formatter {
- private Iterator<Entry<Key,Value>> iter = null;
- private boolean printTs = false;
-
- private final static String tab = "\t";
- private final static String newline = "\n";
-
- public HexFormatter() {}
-
- @Override
- public boolean hasNext() {
- return this.iter.hasNext();
- }
-
- @Override
- public String next() {
- final Entry<Key,Value> entry = iter.next();
-
- String key;
-
- // Observe the timestamps
- if (printTs) {
- key = entry.getKey().toString();
- } else {
- key = entry.getKey().toStringNoTime();
- }
-
- final Value v = entry.getValue();
-
- // Approximate how much space we'll need
- final StringBuilder sb = new StringBuilder(key.length() + v.getSize() * 5);
-
- sb.append(key).append(tab);
-
- for (byte b : v.get()) {
- if ((b >= 48 && b <= 57) || (b >= 97 && b <= 102)) {
- sb.append(String.format("0x%x ", Integer.valueOf(b)));
- }
- }
-
- sb.append(newline);
-
- return sb.toString();
- }
-
- @Override
- public void remove() {}
-
- @Override
- public void initialize(final Iterable<Entry<Key,Value>> scanner, final boolean printTimestamps) {
- this.iter = scanner.iterator();
- this.printTs = printTimestamps;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
deleted file mode 100644
index 6b5e6eb..0000000
--- a/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.junit.Test;
-
-/**
- * Prevent regression of ACCUMULO-3709.
- */
-public class AccumuloOutputFormatIT extends ConfigurableMacBase {
-
- private static final String TABLE = "abc";
-
- @Override
- protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
- cfg.setNumTservers(1);
- }
-
- @Test
- public void testMapred() throws Exception {
- Connector connector = getConnector();
- // create a table and put some data in it
- connector.tableOperations().create(TABLE);
-
- JobConf job = new JobConf();
- BatchWriterConfig batchConfig = new BatchWriterConfig();
- // no flushes!!!!!
- batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
- // use a single thread to ensure our update session times out
- batchConfig.setMaxWriteThreads(1);
- // set the max memory so that we ensure we don't flush on the write.
- batchConfig.setMaxMemory(Long.MAX_VALUE);
- AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
- AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig);
- AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
- AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD));
- RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
-
- try {
- for (int i = 0; i < 3; i++) {
- Mutation m = new Mutation(new Text(String.format("%08d", i)));
- for (int j = 0; j < 3; j++) {
- m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
- }
- writer.write(new Text(TABLE), m);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- // we don't want the exception to come from write
- }
-
- connector.securityOperations().revokeTablePermission("root", TABLE, TablePermission.WRITE);
-
- try {
- writer.close(null);
- fail("Did not throw exception");
- } catch (IOException ex) {
- log.info(ex.getMessage(), ex);
- assertTrue(ex.getCause() instanceof MutationsRejectedException);
- }
- }
-}