You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 06:29:12 UTC
[1/8] git commit: ACCUMULO-2825 Add RowEncodingIterator
Repository: accumulo
Updated Branches:
refs/heads/ACCUMULO-378 417b0b332 -> 4ac04b95d
ACCUMULO-2825 Add RowEncodingIterator
The WholeRowIterator now extends the abstract RowEncodingIterator
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73a9e6cf
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73a9e6cf
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73a9e6cf
Branch: refs/heads/ACCUMULO-378
Commit: 73a9e6cfcad6c836ed9eb95421bc7306037739a9
Parents: 244c1ab
Author: Ryan Leary <rl...@bbn.com>
Authored: Sat May 17 21:03:03 2014 -0400
Committer: Ryan Leary <rl...@bbn.com>
Committed: Sat May 17 21:10:52 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 172 +++++++++++++++++++
.../core/iterators/user/WholeRowIterator.java | 130 ++------------
2 files changed, 191 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
new file mode 100644
index 0000000..dff1e04
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
+ * pairs into a single key/value pair, which is returned through the client as an atomic operation.
+ *
+ * <p>
+ * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
+ * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
+ * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
+ * scan after swapping out sources.
+ *
+ * <p>
+ * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
+ *
+ * @see RowFilter
+ */
+public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value> {
+
+ protected SortedKeyValueIterator<Key,Value> sourceIter;
+ private Key topKey = null;
+ private Value topValue = null;
+
+ // decode a bunch of key value pairs that have been encoded into a single value
+ /**
+ * Given a value generated by the rowEncoder implementation, recreate the
+ * original Key, Value pairs.
+ * @param rowKey
+ * @param rowValue
+ * @return
+ * @throws IOException
+ */
+ public abstract SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException;
+
+ /**
+ * Take a stream of keys and values. Return values in the same order
+ * encoded such that all portions of the key (except for the row value)
+ * and the original value are encoded in some way.
+ * @param keys
+ * @param values
+ * @return
+ * @throws IOException
+ */
+ public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException;
+
+ /**
+ * Implement deepCopy. Ensure sourceIter is copied appropriately.
+ */
+ @Override
+ public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
+
+ List<Key> keys = new ArrayList<Key>();
+ List<Value> values = new ArrayList<Value>();
+
+ private void prepKeys() throws IOException {
+ if (topKey != null)
+ return;
+ Text currentRow;
+ do {
+ if (sourceIter.hasTop() == false)
+ return;
+ currentRow = new Text(sourceIter.getTopKey().getRow());
+ keys.clear();
+ values.clear();
+ while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
+ keys.add(new Key(sourceIter.getTopKey()));
+ values.add(new Value(sourceIter.getTopValue()));
+ sourceIter.next();
+ }
+ } while (!filter(currentRow, keys, values));
+
+ topKey = new Key(currentRow);
+ topValue = rowEncoder(keys, values);
+ }
+
+ /**
+ *
+ * @param currentRow
+ * All keys have this in their row portion (do not modify!).
+ * @param keys
+ * One key for each key in the row, ordered as they are given by the source iterator (do not modify!).
+ * @param values
+ * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!).
+ * @return true if we want to keep the row, false if we want to skip it
+ */
+ protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
+ return true;
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ sourceIter = source;
+ }
+
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ prepKeys();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ topKey = null;
+ topValue = null;
+
+ Key sk = range.getStartKey();
+
+ if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
+ && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
+ // assuming that we are seeking using a key previously returned by this iterator
+ // therefore go to the next row
+ Key followingRowKey = sk.followingKey(PartialKey.ROW);
+ if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
+ return;
+
+ range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
+ }
+
+ sourceIter.seek(range, columnFamilies, inclusive);
+ prepKeys();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 525f27c..c8bceea 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -21,21 +21,15 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.hadoop.io.Text;
/**
*
@@ -53,19 +47,29 @@ import org.apache.hadoop.io.Text;
*
* @see RowFilter
*/
-public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
-
- private SortedKeyValueIterator<Key,Value> sourceIter;
- private Key topKey = null;
- private Value topValue = null;
-
- public WholeRowIterator() {
-
- }
+public class WholeRowIterator extends RowEncodingIterator {
+ public WholeRowIterator() {}
WholeRowIterator(SortedKeyValueIterator<Key,Value> source) {
this.sourceIter = source;
}
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ if (sourceIter != null)
+ return new WholeRowIterator(sourceIter.deepCopy(env));
+ return new WholeRowIterator();
+ }
+
+ @Override
+ public SortedMap<Key, Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
+ return decodeRow(rowKey, rowValue);
+ }
+
+ @Override
+ public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException {
+ return encodeRow(keys, values);
+ }
/**
* Returns the byte array containing the field of row key from the given DataInputStream din.
@@ -139,100 +143,4 @@ public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
return new Value(out.toByteArray());
}
-
- List<Key> keys = new ArrayList<Key>();
- List<Value> values = new ArrayList<Value>();
-
- private void prepKeys() throws IOException {
- if (topKey != null)
- return;
- Text currentRow;
- do {
- if (sourceIter.hasTop() == false)
- return;
- currentRow = new Text(sourceIter.getTopKey().getRow());
- keys.clear();
- values.clear();
- while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
- keys.add(new Key(sourceIter.getTopKey()));
- values.add(new Value(sourceIter.getTopValue()));
- sourceIter.next();
- }
- } while (!filter(currentRow, keys, values));
-
- topKey = new Key(currentRow);
- topValue = encodeRow(keys, values);
-
- }
-
- /**
- *
- * @param currentRow
- * All keys have this in their row portion (do not modify!).
- * @param keys
- * One key for each key in the row, ordered as they are given by the source iterator (do not modify!).
- * @param values
- * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!).
- * @return true if we want to keep the row, false if we want to skip it
- */
- protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
- return true;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- if (sourceIter != null)
- return new WholeRowIterator(sourceIter.deepCopy(env));
- return new WholeRowIterator();
- }
-
- @Override
- public Key getTopKey() {
- return topKey;
- }
-
- @Override
- public Value getTopValue() {
- return topValue;
- }
-
- @Override
- public boolean hasTop() {
- return topKey != null;
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- sourceIter = source;
- }
-
- @Override
- public void next() throws IOException {
- topKey = null;
- topValue = null;
- prepKeys();
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- topKey = null;
- topValue = null;
-
- Key sk = range.getStartKey();
-
- if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
- && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
- // assuming that we are seeking using a key previously returned by this iterator
- // therefore go to the next row
- Key followingRowKey = sk.followingKey(PartialKey.ROW);
- if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
- return;
-
- range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
- }
-
- sourceIter.seek(range, columnFamilies, inclusive);
- prepKeys();
- }
-
}
[8/8] git commit: Merge remote-tracking branch 'origin/master' into
ACCUMULO-378
Posted by el...@apache.org.
Merge remote-tracking branch 'origin/master' into ACCUMULO-378
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4ac04b95
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4ac04b95
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4ac04b95
Branch: refs/heads/ACCUMULO-378
Commit: 4ac04b95d7f1919a35af2d9e60d80a2c9b94074c
Parents: 178ffe9 f131ac6
Author: Josh Elser <el...@apache.org>
Authored: Wed May 21 00:28:59 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 21 00:28:59 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 174 +++++++++++++++++++
.../core/iterators/user/WholeRowIterator.java | 140 +++------------
2 files changed, 199 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
[7/8] git commit: Merge branch '1.6.1-SNAPSHOT'
Posted by el...@apache.org.
Merge branch '1.6.1-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f131ac6e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f131ac6e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f131ac6e
Branch: refs/heads/ACCUMULO-378
Commit: f131ac6e734104d5b88df02f9f7ebb2b9fe58b4f
Parents: b510b76 1193f4b
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 22:57:43 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 22:57:43 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 174 +++++++++++++++++++
.../core/iterators/user/WholeRowIterator.java | 140 +++------------
2 files changed, 199 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
[3/8] ACCUMULO-378 Remove compiler warnings and unused classes,
resolve formatting issues
Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
index 4254154..f879895 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
@@ -49,7 +49,7 @@ public class ReplicationPortAdvertisementIT extends ConfigurableMacIT {
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(2);
}
-
+
@Test
public void tserverReplicationServicePortsAreAdvertised() throws Exception {
// Wait for the cluster to be up
@@ -81,7 +81,7 @@ public class ReplicationPortAdvertisementIT extends ConfigurableMacIT {
// Each tserver should also have equial replicaiton services running internally
Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
}
-
+
@Test
public void masterReplicationServicePortsAreAdvertised() throws Exception {
// Wait for the cluster to be up
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
index 81ae5ca..62c09f5 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
@@ -110,7 +110,7 @@ public class ReplicationSourceOnlyIT extends ConfigurableMacIT {
}
}
}
-
+
});
t.start();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
index 00524b8..da874fa 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
@@ -16,15 +16,11 @@
*/
package org.apache.accumulo.test.replication;
-import java.util.Map.Entry;
-
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index 90bfabf..7146019 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -105,14 +105,14 @@ public class ReplicationTest extends ConfigurableMacIT {
StatusSection.limit(scanner);
for (Entry<Key,Value> entry : scanner) {
Key k = entry.getKey();
-
+
String fileUri = k.getRow().toString();
try {
new URI(fileUri);
} catch (URISyntaxException e) {
Assert.fail("Expected a valid URI: " + fileUri);
}
-
+
replRows.add(fileUri);
}
}
@@ -257,7 +257,8 @@ public class ReplicationTest extends ConfigurableMacIT {
Assert.assertTrue(iter.hasNext());
Entry<Key,Value> entry = iter.next();
// We should at least find one status record for this table, we might find a second if another log was started from ingesting the data
- Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey().getColumnQualifier().toString());
+ Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey()
+ .getColumnQualifier().toString());
s.close();
// Enable replication on table2
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 5566a3b..031b685 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -35,7 +35,6 @@ log4j.logger.org.apache.accumulo.core.file.rfile.bcfile=INFO
log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE
log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO
-#log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
log4j.logger.org.mortbay.log=WARN
log4j.logger.org.apache.hadoop=WARN
[6/8] git commit: Merge branch 'ACCUMULO-2825' into 1.6.1-SNAPSHOT
Posted by el...@apache.org.
Merge branch 'ACCUMULO-2825' into 1.6.1-SNAPSHOT
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1193f4b8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1193f4b8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1193f4b8
Branch: refs/heads/ACCUMULO-378
Commit: 1193f4b84b1ede802b5ac636e802efb3052b7fa0
Parents: 244c1ab b7ccde3
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 22:53:56 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 22:53:56 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 174 +++++++++++++++++++
.../core/iterators/user/WholeRowIterator.java | 140 +++------------
2 files changed, 199 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
[5/8] git commit: ACCUMULO-2825 Fix formatting and remove javadoc
warnings
Posted by el...@apache.org.
ACCUMULO-2825 Fix formatting and remove javadoc warnings
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b7ccde3a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b7ccde3a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b7ccde3a
Branch: refs/heads/ACCUMULO-378
Commit: b7ccde3a2bfaf622779c931fce701e4361b908cd
Parents: b2eaf13
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 22:40:43 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 22:40:43 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 70 +++++++++-----------
.../core/iterators/user/WholeRowIterator.java | 25 +++----
2 files changed, 43 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7ccde3a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
index f083eb3..6387723 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -34,65 +34,59 @@ import org.apache.hadoop.io.Text;
/**
*
- * The RowEncodingIterator is designed to provide row-isolation so that queries see mutations as atomic.
- * It does so by encapsulating an entire row of key/value pairs into a single key/value pair, which is
- * returned through the client as an atomic operation. This is an abstract class, allowing the user to
- * implement rowEncoder and rowDecoder such that the columns and values of a given row may be encoded
- * in a format best suited to the client.
+ * The RowEncodingIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
+ * pairs into a single key/value pair, which is returned through the client as an atomic operation. This is an abstract class, allowing the user to implement
+ * rowEncoder and rowDecoder such that the columns and values of a given row may be encoded in a format best suited to the client.
*
* <p>
* For an example implementation, see {@link WholeRowIterator}.
- *
+ *
* <p>
- * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive
- * first key in a row, (e.g. seek(new Range(new Key(new Text("row")),false,...),...)) this iterator will
- * skip to the next row. This is done in order to prevent repeated scanning of the same row when system
- * automatically creates ranges of that form, which happens in the case of the client calling
- * continueScan, or in the case of the tablet server continuing a scan after swapping out sources.
+ * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
+ * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
+ * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
+ * scan after swapping out sources.
*
* <p>
- * To regain the original key/value pairs of the row, call the rowDecoder function on the key/value
- * pair that this iterator returned.
+ * To regain the original key/value pairs of the row, call the rowDecoder function on the key/value pair that this iterator returned.
*
* @see RowFilter
*/
public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value> {
-
+
protected SortedKeyValueIterator<Key,Value> sourceIter;
private Key topKey = null;
private Value topValue = null;
// decode a bunch of key value pairs that have been encoded into a single value
/**
- * Given a value generated by the rowEncoder implementation, recreate the
- * original Key, Value pairs.
+ * Given a value generated by the rowEncoder implementation, recreate the original Key, Value pairs.
+ *
* @param rowKey
* @param rowValue
- * @return
* @throws IOException
*/
public abstract SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException;
-
+
/**
- * Take a stream of keys and values. Return values in the same order
- * encoded such that all portions of the key (except for the row value)
- * and the original value are encoded in some way.
+ * Take a stream of keys and values. Return values in the same order encoded such that all portions of the key (except for the row value) and the original
+ * value are encoded in some way.
+ *
* @param keys
* @param values
- * @return
* @throws IOException
*/
public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException;
-
+
/**
* Implement deepCopy. Ensure sourceIter is copied appropriately.
*/
@Override
public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
-
+
List<Key> keys = new ArrayList<Key>();
List<Value> values = new ArrayList<Value>();
-
+
private void prepKeys() throws IOException {
if (topKey != null)
return;
@@ -109,11 +103,11 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
sourceIter.next();
}
} while (!filter(currentRow, keys, values));
-
+
topKey = new Key(currentRow);
topValue = rowEncoder(keys, values);
}
-
+
/**
*
* @param currentRow
@@ -127,41 +121,41 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
return true;
}
-
+
@Override
public Key getTopKey() {
return topKey;
}
-
+
@Override
public Value getTopValue() {
return topValue;
}
-
+
@Override
public boolean hasTop() {
return topKey != null;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
sourceIter = source;
}
-
+
@Override
public void next() throws IOException {
topKey = null;
topValue = null;
prepKeys();
}
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
topKey = null;
topValue = null;
-
+
Key sk = range.getStartKey();
-
+
if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
&& sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
// assuming that we are seeking using a key previously returned by this iterator
@@ -169,12 +163,12 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
Key followingRowKey = sk.followingKey(PartialKey.ROW);
if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
return;
-
+
range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
}
-
+
sourceIter.seek(range, columnFamilies, inclusive);
prepKeys();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7ccde3a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 060a004..4b7802d 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -33,22 +33,19 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
/**
*
- * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic.
- * It does so by encapsulating an entire row of key/value pairs into a single key/value pair, which
- * is returned through the client as an atomic operation.
+ * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
+ * pairs into a single key/value pair, which is returned through the client as an atomic operation.
*
* <p>
- * This iterator extends the {@link RowEncodingIterator}, providing implementations for rowEncoder
- * and rowDecoder which serializes all column and value information from a given row into a
- * single ByteStream in a value.
+ * This iterator extends the {@link RowEncodingIterator}, providing implementations for rowEncoder and rowDecoder which serializes all column and value
+ * information from a given row into a single ByteStream in a value.
*
* <p>
- * As with the RowEncodingIterator, when seeking in the WholeRowIterator using a range that starts
- * at a non-inclusive first key in a row, this iterator will skip to the next row.
+ * As with the RowEncodingIterator, when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, this iterator will
+ * skip to the next row.
*
* <p>
- * To regain the original key/value pairs of the row, call the decodeRow function on the key/value
- * pair that this iterator returned.
+ * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
*
* @see RowFilter
*/
@@ -58,7 +55,7 @@ public class WholeRowIterator extends RowEncodingIterator {
WholeRowIterator(SortedKeyValueIterator<Key,Value> source) {
this.sourceIter = source;
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
if (sourceIter != null)
@@ -67,13 +64,13 @@ public class WholeRowIterator extends RowEncodingIterator {
}
@Override
- public SortedMap<Key, Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
- return decodeRow(rowKey, rowValue);
+ public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
+ return decodeRow(rowKey, rowValue);
}
@Override
public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException {
- return encodeRow(keys, values);
+ return encodeRow(keys, values);
}
/**
[4/8] git commit: ACCUMULO-378 Remove compiler warnings and unused
classes, resolve formatting issues
Posted by el...@apache.org.
ACCUMULO-378 Remove compiler warnings and unused classes, resolve formatting issues
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/178ffe97
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/178ffe97
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/178ffe97
Branch: refs/heads/ACCUMULO-378
Commit: 178ffe977027c62629fb46337eee02312114a048
Parents: 417b0b3
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 22:26:12 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 22:26:12 2014 -0400
----------------------------------------------------------------------
.../client/admin/ReplicationOperations.java | 2 +-
.../core/client/impl/ConnectorImpl.java | 2 +-
.../core/client/impl/ReplicationClient.java | 3 +-
.../client/impl/ReplicationOperationsImpl.java | 2 +-
.../replication/ReplicaSystemFactory.java | 12 +-
.../org/apache/accumulo/core/data/Mutation.java | 2 +-
.../core/metadata/schema/MetadataSchema.java | 22 +++-
.../replication/PrintReplicationRecords.java | 4 +-
.../core/replication/ReplicationSchema.java | 59 ++++++---
.../apache/accumulo/core/schema/Section.java | 4 +-
.../ReplicationOperationsImplTest.java | 6 +-
.../replication/AbstractWorkAssigner.java | 1 +
.../replication/ReplicationTableTest.java | 2 +-
.../server/util/ReplicationTableUtilTest.java | 4 +-
.../gc/GarbageCollectWriteAheadLogs.java | 132 ++++++++++---------
.../accumulo/gc/GarbageCollectionAlgorithm.java | 2 +-
.../accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 11 +-
.../CloseWriteAheadLogReferencesTest.java | 19 ++-
.../java/org/apache/accumulo/master/Master.java | 4 +-
.../MasterReplicationCoordinator.java | 9 +-
.../replication/SequentialWorkAssigner.java | 6 +-
.../accumulo/master/replication/Work.java | 98 --------------
.../accumulo/master/replication/WorkMaker.java | 16 +--
.../replication/AbstractWorkAssignerTest.java | 4 +-
.../MasterReplicationCoordinatorTest.java | 11 +-
.../RemoveCompleteReplicationRecordsTest.java | 11 +-
.../master/replication/StatusMakerTest.java | 4 +-
.../accumulo/master/replication/WorkTest.java | 49 -------
.../apache/accumulo/tserver/TabletServer.java | 6 +-
.../BatchWriterReplicationReplayer.java | 7 +-
.../replication/ReplicationProcessor.java | 19 ++-
.../replication/ReplicationServicerHandler.java | 12 +-
.../tserver/log/LocalWALRecoveryTest.java | 2 +-
.../replication/AccumuloReplicaSystemTest.java | 12 +-
.../replication/ReplicationProcessorTest.java | 4 +-
.../test/replication/MockReplicaSystem.java | 1 -
.../test/replication/CyclicReplicationIT.java | 10 +-
.../replication/ReplicationDeadlockTest.java | 7 +-
.../ReplicationPortAdvertisementIT.java | 4 +-
.../replication/ReplicationSourceOnlyIT.java | 2 +-
.../replication/ReplicationTablesMacTest.java | 4 -
.../test/replication/ReplicationTest.java | 7 +-
test/src/test/resources/log4j.properties | 1 -
44 files changed, 250 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 1680732..1d20f79 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -41,7 +41,7 @@ public interface ReplicationOperations {
/**
* Define a cluster with the given name and the given name system
* @param name Unique name for the cluster
- * @param replicaType {@link ReplicaSystem} class name to use to replicate the data
+ * @param replicaType {@link ReplicaSystem} class name to use to replicate the data
* @throws PeerExistsException
*/
public void addPeer(String name, String replicaType) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index e22be52..62ab6a4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -189,7 +189,7 @@ public class ConnectorImpl extends Connector {
@Override
public synchronized ReplicationOperations replicationOperations() {
if (null == replicationops) {
- replicationops = new ReplicationOperationsImpl(instance, credentials);
+ replicationops = new ReplicationOperationsImpl(instance, credentials);
}
return replicationops;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 55d2208..6e36759 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -77,10 +77,9 @@ public class ReplicationClient {
if (masterThriftService.endsWith(":0"))
return null;
-
+
AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance);
- HostAndPort masterAddr = HostAndPort.fromString(masterThriftService);
String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR;
String replCoordinatorAddr;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 373a51b..4355867 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -257,7 +257,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
log.debug("Found id of {} for name {}", strTableId, tableName);
// Get the WALs currently referenced by the table
- BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+ BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
metaBs.fetchColumnFamily(LogColumnFamily.NAME);
Set<String> wals = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
index 619aae9..d1df97e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
@@ -28,7 +28,8 @@ public class ReplicaSystemFactory {
private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
/**
- * @param value {@link ReplicaSystem} implementation class name
+ * @param value
+ * {@link ReplicaSystem} implementation class name
* @return A {@link ReplicaSystem} object from the given name
*/
public static ReplicaSystem get(String value) {
@@ -41,7 +42,7 @@ public class ReplicaSystemFactory {
String name = value.substring(0, index);
String configuration = value.substring(index + 1);
-
+
try {
Class<?> clz = Class.forName(name);
Object o = clz.newInstance();
@@ -61,8 +62,11 @@ public class ReplicaSystemFactory {
/**
* Generate the configuration value for a {@link ReplicaSystem} in the instance properties
- * @param system The desired ReplicaSystem to use
- * @param configuration Configuration string for the desired ReplicaSystem
+ *
+ * @param system
+ * The desired ReplicaSystem to use
+ * @param configuration
+ * Configuration string for the desired ReplicaSystem
* @return Value to set for peer configuration in the instance
*/
public static String getPeerConfigurationValue(Class<? extends ReplicaSystem> system, String configuration) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index c27a25a..619e522 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@ -965,7 +965,7 @@ public class Mutation implements Writable {
serialize();
m.serialize();
if (Arrays.equals(row, m.row) && entries == m.entries && Arrays.equals(data, m.data)) {
- // If two mutations don't have the same
+ // If two mutations don't have the same
if (!replicationSources.equals(m.replicationSources)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 11fcd5a..0d8a0dc 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -247,9 +247,11 @@ public class MetadataSchema {
/**
* Extract the table ID from the colfam (inefficiently if called repeatedly)
- * @param k Key to extract from
+ *
+ * @param k
+ * Key to extract from
* @return The table ID
- * @see #getTableId(Key,Text)
+ * @see #getTableId(Key,Text)
*/
public static String getTableId(Key k) {
Text buff = new Text();
@@ -259,8 +261,11 @@ public class MetadataSchema {
/**
* Extract the table ID from the colfam into the given {@link Text}
- * @param k Key to extract from
- * @param buff Text to place table ID into
+ *
+ * @param k
+ * Key to extract from
+ * @param buff
+ * Text to place table ID into
*/
public static void getTableId(Key k, Text buff) {
Preconditions.checkNotNull(k);
@@ -271,8 +276,11 @@ public class MetadataSchema {
/**
* Extract the file name from the row suffix into the given {@link Text}
- * @param k Key to extract from
- * @param buff Text to place file name into
+ *
+ * @param k
+ * Key to extract from
+ * @param buff
+ * Text to place file name into
*/
public static void getFile(Key k, Text buff) {
Preconditions.checkNotNull(k);
@@ -280,7 +288,7 @@ public class MetadataSchema {
Preconditions.checkArgument(COLF_BYTE_SEQ.equals(k.getColumnFamilyData()), "Given metadata replication status key with incorrect colfam");
k.getRow(buff);
-
+
buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index 60aae9c..bb98440 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -47,7 +47,7 @@ public class PrintReplicationRecords implements Runnable {
private PrintStream out;
private SimpleDateFormat sdf;
- public PrintReplicationRecords(Connector conn,PrintStream out) {
+ public PrintReplicationRecords(Connector conn, PrintStream out) {
this.conn = conn;
this.out = out;
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
@@ -93,6 +93,6 @@ public class PrintReplicationRecords implements Runnable {
} catch (InvalidProtocolBufferException e) {
out.println(entry.getKey().toStringNoTruncate() + "= Could not deserialize Status message");
}
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 96208af..8699bd2 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -67,6 +67,7 @@ public class ReplicationSchema {
/**
* Limit the scanner to only pull replication work records
+ *
* @param scanner
*/
public static void limit(ScannerBase scanner) {
@@ -86,13 +87,15 @@ public class ReplicationSchema {
*/
public static class StatusSection {
public static final Text NAME = new Text("repl");
- private static final ByteSequence BYTE_SEQ_NAME = new ArrayByteSequence("repl");
+ private static final ByteSequence BYTE_SEQ_NAME = new ArrayByteSequence("repl");
/**
* Extract the table ID from the key (inefficiently if called repeatedly)
- * @param k Key to extract from
+ *
+ * @param k
+ * Key to extract from
* @return The table ID
- * @see #getTableId(Key,Text)
+ * @see #getTableId(Key,Text)
*/
public static String getTableId(Key k) {
Text buff = new Text();
@@ -102,8 +105,11 @@ public class ReplicationSchema {
/**
* Extract the table ID from the key into the given {@link Text}
- * @param k Key to extract from
- * @param buff Text to place table ID into
+ *
+ * @param k
+ * Key to extract from
+ * @param buff
+ * Text to place table ID into
*/
public static void getTableId(Key k, Text buff) {
Preconditions.checkNotNull(k);
@@ -114,8 +120,11 @@ public class ReplicationSchema {
/**
* Extract the file name from the row suffix into the given {@link Text}
- * @param k Key to extract from
- * @param buff Text to place file name into
+ *
+ * @param k
+ * Key to extract from
+ * @param buff
+ * Text to place file name into
*/
public static void getFile(Key k, Text buff) {
Preconditions.checkNotNull(k);
@@ -127,6 +136,7 @@ public class ReplicationSchema {
/**
* Limit the scanner to only return Status records
+ *
* @param scanner
*/
public static void limit(ScannerBase scanner) {
@@ -140,8 +150,8 @@ public class ReplicationSchema {
}
/**
- * Holds the order in which files needed for replication were closed. The intent is to be able to guarantee
- * that files which were closed earlier were replicated first and we don't replay data in the wrong order on our peers
+ * Holds the order in which files needed for replication were closed. The intent is to be able to guarantee that files which were closed earlier were
+ * replicated first and we don't replay data in the wrong order on our peers
* <p>
* <code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
*/
@@ -152,7 +162,9 @@ public class ReplicationSchema {
/**
* Extract the table ID from the given key (inefficiently if called repeatedly)
- * @param k OrderSection Key
+ *
+ * @param k
+ * OrderSection Key
* @return source table id
*/
public static String getTableId(Key k) {
@@ -163,8 +175,11 @@ public class ReplicationSchema {
/**
* Extract the table ID from the given key
- * @param k OrderSection key
- * @param buff Text to place table ID into
+ *
+ * @param k
+ * OrderSection key
+ * @param buff
+ * Text to place table ID into
*/
public static void getTableId(Key k, Text buff) {
Preconditions.checkNotNull(k);
@@ -175,6 +190,7 @@ public class ReplicationSchema {
/**
* Limit the scanner to only return Order records
+ *
* @param scanner
*/
public static void limit(ScannerBase scanner) {
@@ -183,8 +199,11 @@ public class ReplicationSchema {
/**
* Creates the Mutation for the Order section for the given file and time
- * @param file Filename
- * @param timeInMillis Time in millis that the file was closed
+ *
+ * @param file
+ * Filename
+ * @param timeInMillis
+ * Time in millis that the file was closed
* @return Mutation for the Order section
*/
public static Mutation createMutation(String file, long timeInMillis) {
@@ -202,7 +221,7 @@ public class ReplicationSchema {
log.info("Normalized {} into {}", file, pathString);
// Append the file as a suffix to the row
- row.append((ROW_SEPARATOR+pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length());
+ row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length());
// Make the mutation and add the column update
return new Mutation(row);
@@ -210,9 +229,13 @@ public class ReplicationSchema {
/**
* Add a column update to the given mutation with the provided tableId and value
- * @param m Mutation for OrderSection
- * @param tableId Source table id
- * @param v Serialized Status msg
+ *
+ * @param m
+ * Mutation for OrderSection
+ * @param tableId
+ * Source table id
+ * @param v
+ * Serialized Status msg
* @return The original Mutation
*/
public static Mutation add(Mutation m, Text tableId, Value v) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/main/java/org/apache/accumulo/core/schema/Section.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/schema/Section.java b/core/src/main/java/org/apache/accumulo/core/schema/Section.java
index 3dfee5d..0cebf87 100644
--- a/core/src/main/java/org/apache/accumulo/core/schema/Section.java
+++ b/core/src/main/java/org/apache/accumulo/core/schema/Section.java
@@ -21,7 +21,7 @@ import org.apache.accumulo.core.data.Range;
public class Section {
private String rowPrefix;
private Range range;
-
+
public Section(String startRow, boolean startInclusive, String endRow, boolean endInclusive) {
rowPrefix = startRow;
range = new Range(startRow, startInclusive, endRow, endInclusive);
@@ -34,4 +34,4 @@ public class Section {
public Range getRange() {
return range;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
index f0670ee..b4a4c65 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
@@ -156,7 +156,7 @@ public class ReplicationOperationsImplTest {
Assert.fail("ReplicationOperations.drain did not complete");
}
- // After both metadata and replication
+ // After both metadata and replication
Assert.assertTrue(done.get());
Assert.assertFalse(exception.get());
}
@@ -240,7 +240,7 @@ public class ReplicationOperationsImplTest {
Assert.fail("ReplicationOperations.drain did not complete");
}
- // After both metadata and replication
+ // After both metadata and replication
Assert.assertTrue(done.get());
Assert.assertFalse(exception.get());
}
@@ -360,7 +360,7 @@ public class ReplicationOperationsImplTest {
System.out.println(e.getKey());
}
- final AtomicBoolean done = new AtomicBoolean(false), firstRunComplete = new AtomicBoolean(false);
+ final AtomicBoolean done = new AtomicBoolean(false);
final AtomicBoolean exception = new AtomicBoolean(false);
final ReplicationOperationsImpl roi = new ReplicationOperationsImpl(inst, new Credentials("root", new PasswordToken("")));
Thread t = new Thread(new Runnable() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
index 2a7b825..625a36e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/AbstractWorkAssigner.java
@@ -34,6 +34,7 @@ public abstract class AbstractWorkAssigner implements WorkAssigner {
protected boolean isWorkRequired(Status status) {
return StatusUtil.isWorkRequired(status);
}
+
public static final String KEY_SEPARATOR = "|";
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
index 74202be..bb80c81 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
@@ -51,7 +51,7 @@ public class ReplicationTableTest {
private MockInstance instance;
private Connector conn;
-
+
@Before
public void setupMockAccumulo() throws Exception {
instance = new MockInstance(testName.getMethodName());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 3e3332f..12295ef 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -130,7 +130,7 @@ public class ReplicationTableUtilTest {
public void setsCombinerOnMetadataCorrectly() throws Exception {
Connector conn = createMock(Connector.class);
TableOperations tops = createMock(TableOperations.class);
-
+
String myMetadataTable = "mymetadata";
Map<String,EnumSet<IteratorScope>> iterators = new HashMap<>();
iterators.put("vers", EnumSet.of(IteratorScope.majc, IteratorScope.minc, IteratorScope.scan));
@@ -141,7 +141,7 @@ public class ReplicationTableUtilTest {
expect(tops.listIterators(myMetadataTable)).andReturn(iterators);
tops.attachIterator(myMetadataTable, combiner);
expectLastCall().once();
-
+
replay(conn, tops);
ReplicationTableUtil.configureMetadataTable(conn, myMetadataTable);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index c551c6c..269e15e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -78,61 +78,66 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class GarbageCollectWriteAheadLogs {
private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
-
+
private final Instance instance;
private final VolumeManager fs;
-
+
private boolean useTrash;
-
+
/**
* Creates a new GC WAL object.
- *
- * @param instance instance to use
- * @param fs volume manager to use
- * @param useTrash true to move files to trash rather than delete them
+ *
+ * @param instance
+ * instance to use
+ * @param fs
+ * volume manager to use
+ * @param useTrash
+ * true to move files to trash rather than delete them
*/
GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean useTrash) throws IOException {
this.instance = instance;
this.fs = fs;
this.useTrash = useTrash;
}
-
+
/**
* Gets the instance used by this object.
- *
+ *
* @return instance
*/
Instance getInstance() {
return instance;
}
+
/**
* Gets the volume manager used by this object.
- *
+ *
* @return volume manager
*/
VolumeManager getVolumeManager() {
return fs;
}
+
/**
- * Checks if the volume manager should move files to the trash rather than
- * delete them.
- *
+ * Checks if the volume manager should move files to the trash rather than delete them.
+ *
* @return true if trash is used
*/
boolean isUsingTrash() {
return useTrash;
}
+
public void collect(GCStatus status) {
-
+
Span span = Trace.start("scanServers");
try {
-
- Map<String, Path> sortedWALogs = getSortedWALogs();
+
+ Map<String,Path> sortedWALogs = getSortedWALogs();
status.currentLog.started = System.currentTimeMillis();
-
+
Map<Path,String> fileToServerMap = new HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new HashMap<String, Path>();
+ Map<String,Path> nameToFileMap = new HashMap<String,Path>();
int count = scanServers(fileToServerMap, nameToFileMap);
long fileScanStop = System.currentTimeMillis();
log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
@@ -164,27 +169,27 @@ public class GarbageCollectWriteAheadLogs {
}
long replicationEntryScanStop = System.currentTimeMillis();
- log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
-
+ log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
+
span = Trace.start("removeFiles");
Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
-
+
long removeStop = System.currentTimeMillis();
log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
status.currentLog.finished = removeStop;
status.lastLog = status.currentLog;
status.currentLog = new GcCycleStats();
span.stop();
-
+
} catch (Exception e) {
log.error("exception occured while garbage collecting write ahead logs", e);
} finally {
span.stop();
}
}
-
+
boolean holdsLock(HostAndPort addr) {
try {
String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + addr.toString();
@@ -197,8 +202,8 @@ public class GarbageCollectWriteAheadLogs {
return true;
}
}
-
- private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String, Path> sortedWALogs, final GCStatus status) {
+
+ private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(instance);
for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
if (entry.getKey().isEmpty()) {
@@ -247,7 +252,7 @@ public class GarbageCollectWriteAheadLogs {
}
}
}
-
+
for (Path swalog : sortedWALogs.values()) {
log.debug("Removing sorted WAL " + swalog);
try {
@@ -266,14 +271,15 @@ public class GarbageCollectWriteAheadLogs {
}
}
}
-
+
return 0;
}
-
+
/**
* Converts a list of paths to their corresponding strings.
- *
- * @param paths list of paths
+ *
+ * @param paths
+ * list of paths
* @return string forms of paths
*/
static List<String> paths2strings(List<Path> paths) {
@@ -282,14 +288,15 @@ public class GarbageCollectWriteAheadLogs {
result.add(path.toString());
return result;
}
-
+
/**
- * Reverses the given mapping of file paths to servers. The returned map
- * provides a list of file paths for each server. Any path whose name is not
- * in the mapping of file names to paths is skipped.
- *
- * @param fileToServerMap map of file paths to servers
- * @param nameToFileMap map of file names to paths
+ * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
+ * mapping of file names to paths is skipped.
+ *
+ * @param fileToServerMap
+ * map of file paths to servers
+ * @param nameToFileMap
+ * map of file names to paths
* @return map of servers to lists of file paths
*/
static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
@@ -306,9 +313,9 @@ public class GarbageCollectWriteAheadLogs {
}
return result;
}
-
- protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String, Path> sortedWALogs, GCStatus status, Credentials creds) throws IOException, KeeperException,
- InterruptedException {
+
+ protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status, Credentials creds) throws IOException,
+ KeeperException, InterruptedException {
int count = 0;
Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(creds);
@@ -335,11 +342,12 @@ public class GarbageCollectWriteAheadLogs {
return count;
}
- protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status, Credentials creds) throws IOException, KeeperException, InterruptedException {
+ protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status, Credentials creds) throws IOException,
+ KeeperException, InterruptedException {
Connector conn;
try {
- conn = instance.getConnector(creds.getPrincipal(), creds.getToken());
- } catch (AccumuloException|AccumuloSecurityException e) {
+ conn = instance.getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (AccumuloException | AccumuloSecurityException e) {
log.error("Failed to get connector", e);
throw new IllegalArgumentException(e);
}
@@ -369,7 +377,9 @@ public class GarbageCollectWriteAheadLogs {
/**
* Determine if the given WAL is needed for replication
- * @param wal The full path (URI)
+ *
+ * @param wal
+ * The full path (URI)
* @return True if the WAL is still needed by replication (not a candidate for deletion)
*/
protected boolean neededByReplication(Connector conn, String wal) {
@@ -391,7 +401,7 @@ public class GarbageCollectWriteAheadLogs {
// } catch (Exception e) {
// log.error("Could not read replication table");
// }
-
+
Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
// TODO Push down this filter to the tserver to only return records
@@ -427,7 +437,7 @@ public class GarbageCollectWriteAheadLogs {
try {
Scanner replScanner = ReplicationTable.getScanner(conn);
-
+
// Scan only the Status records
StatusSection.limit(replScanner);
@@ -445,7 +455,8 @@ public class GarbageCollectWriteAheadLogs {
private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
}
- //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
+
+ // TODO Remove deprecation warning suppression when Hadoop1 support is dropped
@SuppressWarnings("deprecation")
/**
* Scans write-ahead log directories for logs. The maps passed in are
@@ -491,22 +502,24 @@ public class GarbageCollectWriteAheadLogs {
}
return servers.size();
}
-
- private Map<String, Path> getSortedWALogs() throws IOException {
+
+ private Map<String,Path> getSortedWALogs() throws IOException {
return getSortedWALogs(ServerConstants.getRecoveryDirs());
}
+
/**
* Looks for write-ahead logs in recovery directories.
- *
- * @param recoveryDirs recovery directories
+ *
+ * @param recoveryDirs
+ * recovery directories
* @return map of log file names to paths
*/
- Map<String, Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
- Map<String, Path> result = new HashMap<String, Path>();
-
+ Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
+ Map<String,Path> result = new HashMap<String,Path>();
+
for (String dir : recoveryDirs) {
Path recoveryDir = new Path(dir);
-
+
if (fs.exists(recoveryDir)) {
for (FileStatus status : fs.listStatus(recoveryDir)) {
String name = status.getPath().getName();
@@ -520,11 +533,12 @@ public class GarbageCollectWriteAheadLogs {
}
return result;
}
-
+
/**
* Checks if a string is a valid UUID.
- *
- * @param name string to check
+ *
+ * @param name
+ * string to check
* @return true if string is a UUID
*/
static boolean isUUID(String name) {
@@ -538,5 +552,5 @@ public class GarbageCollectWriteAheadLogs {
return false;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 9362264..0f5cada 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -231,7 +231,7 @@ public class GarbageCollectionAlgorithm {
// We want to advance both, and try to delete the candidate if we can
candidates.next();
pendingReplication.next();
-
+
// We cannot delete a file if it is still needed for replication
if (!StatusUtil.isSafeForRemoval(pendingReplica.getValue())) {
// If it must be replicated, we must remove it from the candidate set to prevent deletion
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 428a154..4b9d052 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -526,12 +526,12 @@ public class SimpleGarbageCollector implements Iface {
try {
stat = Status.parseFrom(input.getValue().get());
} catch (InvalidProtocolBufferException e) {
- log.warn("Could not deserialize protobuf for: " + input.getKey());
+ log.warn("Could not deserialize protobuf for: " + input.getKey());
stat = null;
}
return Maps.immutableEntry(file, stat);
}
-
+
});
} catch (TableNotFoundException e) {
// No elements that we need to preclude
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index ab877fc..6445294 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -300,6 +300,7 @@ public class GarbageCollectWriteAheadLogsTest {
private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
private List<Entry<Key,Value>> replData;
+
/**
* @param instance
* @param fs
@@ -355,7 +356,7 @@ public class GarbageCollectWriteAheadLogsTest {
Instance inst = new MockInstance(testName.getMethodName());
Credentials creds = new Credentials("root", new PasswordToken(""));
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
+
GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false);
ReplicationTable.create(conn);
@@ -381,7 +382,7 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus();
GcCycleStats cycleStats = new GcCycleStats();
status.currentLog = cycleStats;
-
+
// We should iterate over two entries
Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds));
@@ -399,7 +400,7 @@ public class GarbageCollectWriteAheadLogsTest {
Instance inst = new MockInstance(testName.getMethodName());
Credentials creds = new Credentials("root", new PasswordToken(""));
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-
+
GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false);
ReplicationTable.create(conn);
@@ -411,7 +412,7 @@ public class GarbageCollectWriteAheadLogsTest {
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
bw.addMutation(m);
-
+
m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
bw.addMutation(m);
@@ -427,7 +428,7 @@ public class GarbageCollectWriteAheadLogsTest {
GCStatus status = new GCStatus();
GcCycleStats cycleStats = new GcCycleStats();
status.currentLog = cycleStats;
-
+
// We should iterate over two entries
Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 66152c4..13d2263 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -182,13 +182,13 @@ public class CloseWriteAheadLogReferencesTest {
logEntry.tabletId = 1;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
logEntry.server = "tserver1";
logEntry.tabletId = 2;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
logEntry.server = "tserver1";
logEntry.tabletId = 3;
@@ -227,9 +227,8 @@ public class CloseWriteAheadLogReferencesTest {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
- String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(),
- file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/" + UUID.randomUUID(),
- file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
+ String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
+ + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
@@ -244,32 +243,32 @@ public class CloseWriteAheadLogReferencesTest {
logEntry.extent = new KeyExtent(new Text("5"), null, null);
logEntry.tabletId = 2;
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
logEntry.filename = file2;
logEntry.server = "tserver2";
logEntry.tabletId = 3;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
logEntry.tabletId = 4;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
logEntry.filename = file3;
logEntry.server = "tserver3";
logEntry.tabletId = 5;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
logEntry.server = "tserver3";
logEntry.tabletId = 7;
logEntry.logSet = Collections.singleton(logEntry.filename);
data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
+
logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
logEntry.server = "tserver3";
logEntry.tabletId = 15;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 5b8bd8d..4807154 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -994,7 +994,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
// Start the replication coordinator which assigns tservers to service replication requests
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
- TraceWrap.service(new MasterReplicationCoordinator(this, getSystemConfiguration())));
+ TraceWrap.service(new MasterReplicationCoordinator(this)));
ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT,
replicationCoordinatorProcessor, "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -1002,7 +1002,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
// Advertise that port we used so peers don't have to be told what it is
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
replAddress.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
-
+
while (clientService.isServing()) {
UtilWaitThread.sleep(500);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index d53bc33..9331075 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -23,7 +23,6 @@ import java.util.Set;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.replication.ReplicationCoordinatorErrorCode;
import org.apache.accumulo.core.replication.thrift.RemoteCoordinationException;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
@@ -46,17 +45,15 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
private final Master master;
private final Instance inst;
- private final AccumuloConfiguration conf;
private final Random rand;
private final ZooReader reader;
- public MasterReplicationCoordinator(Master master, AccumuloConfiguration conf) {
- this(master, conf, new ZooReader(master.getInstance().getZooKeepers(), master.getInstance().getZooKeepersSessionTimeOut()));
+ public MasterReplicationCoordinator(Master master) {
+ this(master, new ZooReader(master.getInstance().getZooKeepers(), master.getInstance().getZooKeepersSessionTimeOut()));
}
- protected MasterReplicationCoordinator(Master master, AccumuloConfiguration conf, ZooReader reader) {
+ protected MasterReplicationCoordinator(Master master, ZooReader reader) {
this.master = master;
- this.conf = conf;
this.rand = new Random(358923462l);
this.inst = master.getInstance();
this.reader = reader;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index a2212a3..7609ca5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -62,7 +62,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
private Connector conn;
private AccumuloConfiguration conf;
- // @formatter.off
+ // @formatter:off
/*
* {
* peer1 => {sourceTableId1 => work_queue_key1, sourceTableId2 => work_queue_key2, ...}
@@ -70,7 +70,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
* ...
* }
*/
- // @formatter.on
+ // @formatter:on
private Map<String,Map<String,String>> queuedWorkByPeerName;
private DistributedWorkQueue workQueue;
@@ -322,7 +322,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
try {
log.debug("Queued work for {} and {} from table ID {}", key, path, sourceTableId);
- workQueue.addWork(key, path);
+ workQueue.addWork(key, path);
queuedWorkForPeer.put(sourceTableId, key);
return 1;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
deleted file mode 100644
index cb45b44..0000000
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.protobuf.TextFormat;
-
-/**
- * Encapsulates a file (path) and {@link Status}
- */
-public class Work implements Writable {
-
- private String file;
-
- private Status status;
-
- public Work() { }
-
- public Work(String file, Status status) {
- this.file = file;
- this.status = status;
- }
-
- public String getFile() {
- return file;
- }
-
- public void setFile(String file) {
- this.file = file;
- }
-
- public Status getStatus() {
- return status;
- }
-
- public void setStatus(Status status) {
- this.status = status;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeString(out, file);
- byte[] bytes = status.toByteArray();
- WritableUtils.writeVInt(out, bytes.length);
- out.write(bytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- file = WritableUtils.readString(in);
- int len = WritableUtils.readVInt(in);
- byte[] bytes = new byte[len];
- in.readFully(bytes);
- status = Status.parseFrom(bytes);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof Work) {
- Work other = (Work) o;
- return file.equals(other.getFile()) && status.equals(other.getStatus());
- }
-
- return false;
- }
-
- @Override
- public String toString() {
- return file + " " + TextFormat.shortDebugString(status);
- }
-
- @Override
- public int hashCode() {
- return file.hashCode() ^ status.hashCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 856153d..68109a6 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -81,18 +81,18 @@ public class WorkMaker {
writer = null;
return;
}
-
+
// Only pull records about data that has been ingested and is ready for replication
StatusSection.limit(s);
-
+
TableConfiguration tableConf;
-
+
Text file = new Text(), tableId = new Text();
for (Entry<Key,Value> entry : s) {
// Extract the useful bits from the status key
ReplicationSchema.StatusSection.getFile(entry.getKey(), file);
ReplicationSchema.StatusSection.getTableId(entry.getKey(), tableId);
- log.info("Processing replication status record for " + file + " on table "+ tableId);
+ log.info("Processing replication status record for " + file + " on table " + tableId);
Status status;
try {
@@ -107,17 +107,17 @@ public class WorkMaker {
if (!shouldCreateWork(status)) {
continue;
}
-
+
// Get the table configuration for the table specified by the status record
tableConf = ServerConfiguration.getTableConfiguration(conn.getInstance(), tableId.toString());
-
+
// Pull the relevant replication targets
// TODO Cache this instead of pulling it every time
Map<String,String> replicationTargets = getReplicationTargets(tableConf);
-
+
// If we have targets, we need to make a work record
// TODO Don't replicate if it's a only a newFile entry (nothing to replicate yet)
- // -- Another scanner over the WorkSection can make this relatively cheap
+ // -- Another scanner over the WorkSection can make this relatively cheap
if (!replicationTargets.isEmpty()) {
Span workSpan = Trace.start("createWorkMutations");
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
index f4e60e5..d0f1f3f 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/AbstractWorkAssignerTest.java
@@ -33,11 +33,11 @@ public class AbstractWorkAssignerTest {
@Test
public void createsValidZKNodeName() {
- Path p = new Path ("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+ Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
String key = AbstractWorkAssigner.getQueueKey(p.toString(), target);
-
+
PathUtils.validatePath(key);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
index e252096..045f542 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
@@ -38,8 +38,7 @@ public class MasterReplicationCoordinatorTest {
public void randomServer() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
- AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
- MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, conf, reader);
+ MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
Assert.assertEquals(inst1, coordinator.getRandomTServer(Collections.singleton(inst1), 0));
@@ -49,8 +48,7 @@ public class MasterReplicationCoordinatorTest {
public void invalidOffset() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
- AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
- MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, conf, reader);
+ MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
Assert.assertEquals(inst1, coordinator.getRandomTServer(Collections.singleton(inst1), 1));
@@ -60,9 +58,8 @@ public class MasterReplicationCoordinatorTest {
public void randomServerFromMany() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
- AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
- MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, conf, reader);
-
+ MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
+
TreeSet<TServerInstance> instances = new TreeSet<>();
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
instances.add(inst1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index 1cd30f8..ec19a94 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -62,7 +62,7 @@ public class RemoveCompleteReplicationRecordsTest {
@Rule
public TestName test = new TestName();
-
+
@Before
public void initialize() throws Exception {
inst = new MockInstance(test.getMethodName());
@@ -112,14 +112,13 @@ public class RemoveCompleteReplicationRecordsTest {
for (int i = 0; i < numRecords; i++) {
String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build()));
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
bw.addMutation(m);
}
bw.close();
Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
bs.setRanges(Collections.singleton(new Range()));
@@ -151,7 +150,7 @@ public class RemoveCompleteReplicationRecordsTest {
for (int i = 0; i < numRecords; i++) {
String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build()));
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
replBw.addMutation(m);
}
@@ -206,7 +205,7 @@ public class RemoveCompleteReplicationRecordsTest {
builder.setCreatedTime(time++);
String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
Mutation m = new Mutation(file);
- Value v = ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build());
+ Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build());
StatusSection.add(m, new Text(Integer.toString(i)), v);
replBw.addMutation(m);
m = OrderSection.createMutation(file, time);
@@ -296,7 +295,7 @@ public class RemoveCompleteReplicationRecordsTest {
for (int i = 0; i < numRecords; i++) {
String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
Mutation m = new Mutation(file);
- StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build()));
+ StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
replBw.addMutation(m);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
index 4c0ab2b..966e2bd 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
@@ -196,7 +196,7 @@ public class StatusMakerTest {
s.setRange(ReplicationSection.getRange());
s.fetchColumnFamily(ReplicationSection.COLF);
Assert.assertEquals(0, Iterables.size(s));
-
+
}
@Test
@@ -249,7 +249,7 @@ public class StatusMakerTest {
Text buff = new Text();
while (expectedFiles.hasNext() && iter.hasNext()) {
String file = expectedFiles.next();
- Entry<Key,Value> entry = iter.next();
+ Entry<Key,Value> entry = iter.next();
Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
OrderSection.getTableId(entry.getKey(), buff);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
deleted file mode 100644
index 09530a7..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class WorkTest {
-
- @Test
- public void serialization() throws IOException {
- Work w = new Work("/foo/bar", StatusUtil.openWithUnknownLength());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- w.write(new DataOutputStream(baos));
-
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-
- Work newW = new Work();
- newW.readFields(new DataInputStream(bais));
-
- Assert.assertEquals(w, newW);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 209ee08..567b2ad 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
private HostAndPort startReplicationService() throws UnknownHostException {
- ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance(), getSystemConfiguration()));
+ ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
AccumuloConfiguration conf = getSystemConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -3207,7 +3207,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// main loop listens for client requests
public void run() {
SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
-
+
try {
clientAddress = startTabletClientService();
} catch (UnknownHostException e1) {
@@ -3238,6 +3238,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException("Failed to start replication service", e);
}
+ log.info("Started replication service at " + replicationAddress);
+
// Start the pool to handle outgoing replications
ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
replWorker.setExecutor(replicationThreadPool);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 0824169..45c1409 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.KeyValues;
@@ -38,8 +37,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Use a BatchWriter to replay WAL entries to an Accumulo table. This assumes that all
- * WAL entries are for this table. Pruning out undesired entries is expected to be done by the sender.
+ * Use a BatchWriter to replay WAL entries to an Accumulo table. This assumes that all WAL entries are for this table. Pruning out undesired entries is expected
+ * to be done by the sender.
*/
public class BatchWriterReplicationReplayer implements AccumuloReplicationReplayer {
private static final Logger log = LoggerFactory.getLogger(BatchWriterReplicationReplayer.class);
@@ -61,7 +60,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
log.error("Could not deserialize edit from stream", e);
throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream");
}
-
+
// Create the batchScanner if we don't already have one.
if (null == bw) {
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 149af6e..9fb7fe5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -144,7 +144,7 @@ public class ReplicationProcessor implements Processor {
Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
if (null == peerType) {
- String msg = "Cannot process replication for unknown peer: " + peerName;
+ String msg = "Cannot process replication for unknown peer: " + peerName;
log.warn(msg);
throw new IllegalArgumentException(msg);
}
@@ -152,19 +152,24 @@ public class ReplicationProcessor implements Processor {
return peerType;
}
- public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, InvalidProtocolBufferException {
+ public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+ InvalidProtocolBufferException {
Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
s.setRange(Range.exact(file));
s.fetchColumn(WorkSection.NAME, target.toText());
-
+
return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
}
/**
* Record the updated Status for this file and target
- * @param filePath Path to file being replicated
- * @param status Updated Status after replication
- * @param target Peer that was replicated to
+ *
+ * @param filePath
+ * Path to file being replicated
+ * @param status
+ * Updated Status after replication
+ * @param target
+ * Peer that was replicated to
*/
public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
try {
@@ -179,6 +184,6 @@ public class ReplicationProcessor implements Processor {
log.error("Error recording updated Status for {}", filePath, e);
throw new RuntimeException(e);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index f70c565..820c586 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -46,11 +46,9 @@ public class ReplicationServicerHandler implements Iface {
private static final Logger log = LoggerFactory.getLogger(ReplicationServicerHandler.class);
private Instance inst;
- private AccumuloConfiguration conf;
- public ReplicationServicerHandler(Instance inst, AccumuloConfiguration conf) {
+ public ReplicationServicerHandler(Instance inst) {
this.inst = inst;
- this.conf = conf;
}
@Override
@@ -63,7 +61,7 @@ public class ReplicationServicerHandler implements Iface {
String tableName;
try {
- conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+ conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Could not get connection", e);
throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get connector");
@@ -98,7 +96,8 @@ public class ReplicationServicerHandler implements Iface {
clz = untypedClz.asSubclass(AccumuloReplicationReplayer.class);
} catch (ClassNotFoundException e) {
log.error("Could not instantiate replayer class {}", handlerClassForTable, e);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class " + handlerClassForTable);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class "
+ + handlerClassForTable);
}
// Create an instance
@@ -107,7 +106,8 @@ public class ReplicationServicerHandler implements Iface {
replayer = clz.newInstance();
} catch (InstantiationException | IllegalAccessException e1) {
log.error("Could not instantiate replayer class {}", clz.getName());
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class" + clz.getName());
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class"
+ + clz.getName());
}
long entriesReplicated = replayer.replicateLog(conn, tableName, data);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
index 1a8909e..df6ec2d 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
@@ -66,7 +66,7 @@ public class LocalWALRecoveryTest {
recovery.parseArgs("--dfs-wal-directory", walTarget.getAbsolutePath());
}
- //@Test
+ @Test
public void testRecoverLocalWriteAheadLogs() throws IOException {
Path targetPath = new Path(walTarget.toURI());
FileSystem fs = FileSystem.get(targetPath.toUri(), new Configuration());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index e651b4d..07d1201 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -62,9 +62,8 @@ public class AccumuloReplicaSystemTest {
key.seq = 1l;
/*
- * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing
- * whatsoever in reality regarding what these entries would actually look like in a WAL.
- * They are solely for testing that each LogEvents is handled, order is not important.
+ * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing whatsoever in reality regarding what these entries would actually
+ * look like in a WAL. They are solely for testing that each LogEvents is handled, order is not important.
*/
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent(new Text("1"), null, null);
@@ -168,9 +167,8 @@ public class AccumuloReplicaSystemTest {
key.seq = 1l;
/*
- * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing
- * whatsoever in reality regarding what these entries would actually look like in a WAL.
- * They are solely for testing that each LogEvents is handled, order is not important.
+ * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing whatsoever in reality regarding what these entries would actually
+ * look like in a WAL. They are solely for testing that each LogEvents is handled, order is not important.
*/
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent(new Text("1"), null, null);
@@ -275,7 +273,7 @@ public class AccumuloReplicaSystemTest {
LogFileValue value = new LogFileValue();
value.mutations = new ArrayList<>();
-
+
Mutation m = new Mutation("row");
m.put("", "", new Value(new byte[0]));
value.mutations.add(m);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index ae05800..df4845c 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -39,7 +39,7 @@ public class ReplicationProcessorTest {
Instance inst = EasyMock.createMock(Instance.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
Credentials creds = new Credentials("foo", new PasswordToken("bar"));
-
+
Map<String,String> data = new HashMap<>();
String peerName = "peer";
@@ -57,7 +57,7 @@ public class ReplicationProcessorTest {
Instance inst = EasyMock.createMock(Instance.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
Credentials creds = new Credentials("foo", new PasswordToken("bar"));
-
+
Map<String,String> data = new HashMap<>();
ConfigurationCopy conf = new ConfigurationCopy(data);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
index cafd9b7..fa6ca2f 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -51,7 +51,6 @@ public class MockReplicaSystem implements ReplicaSystem {
Thread.currentThread().interrupt();
return status;
}
-
Status newStatus = builder.build();
log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index 032e519..d8ef56f 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -99,7 +99,7 @@ public class CyclicReplicationIT {
master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
MiniAccumuloClusterImpl master1Cluster = master1Cfg.build();
setCoreSite(master1Cluster);
-
+
MiniAccumuloConfigImpl master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password);
master2Cfg.setNumTservers(1);
master2Cfg.setInstanceName("master2");
@@ -126,7 +126,7 @@ public class CyclicReplicationIT {
Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(),
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers())));
-
+
connMaster1.tableOperations().create(master1Cluster.getInstanceName(), false);
String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Cluster.getInstanceName());
Assert.assertNotNull(master1TableId);
@@ -137,11 +137,13 @@ public class CyclicReplicationIT {
// Replicate master1 in the master1 cluster to master2 in the master2 cluster
connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true");
- connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), Property.TABLE_REPLICATION_TARGETS.getKey() + master2Cluster.getInstanceName(), master2TableId);
+ connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(),
+ Property.TABLE_REPLICATION_TARGETS.getKey() + master2Cluster.getInstanceName(), master2TableId);
// Replicate master2 in the master2 cluster to master1 in the master2 cluster
connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true");
- connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(), master1TableId);
+ connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(),
+ Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(), master1TableId);
IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class);
SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/178ffe97/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
index d43aa32..418d717 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
@@ -97,7 +97,7 @@ public class ReplicationDeadlockTest extends ConfigurableMacIT {
}
}
}
-
+
});
t.start();
@@ -160,10 +160,11 @@ public class ReplicationDeadlockTest extends ConfigurableMacIT {
keepRunning.set(false);
t.join(5000);
-
+
for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
Scanner s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused") Entry<Key,Value> entry : s) {}
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {}
}
}
}
[2/8] git commit: Update iterator javadoc
Posted by el...@apache.org.
Update iterator javadoc
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b2eaf13b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b2eaf13b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b2eaf13b
Branch: refs/heads/ACCUMULO-378
Commit: b2eaf13b8abbedc8a1a5b80cae469f20b7587842
Parents: 73a9e6c
Author: Ryan Leary <rl...@bbn.com>
Authored: Tue May 20 10:02:50 2014 -0400
Committer: Ryan Leary <rl...@bbn.com>
Committed: Tue May 20 10:02:50 2014 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 22 +++++++++++++-------
.../core/iterators/user/WholeRowIterator.java | 19 ++++++++++-------
2 files changed, 27 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2eaf13b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
index dff1e04..f083eb3 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -34,17 +34,25 @@ import org.apache.hadoop.io.Text;
/**
*
- * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
- * pairs into a single key/value pair, which is returned through the client as an atomic operation.
+ * The RowEncodingIterator is designed to provide row-isolation so that queries see mutations as atomic.
+ * It does so by encapsulating an entire row of key/value pairs into a single key/value pair, which is
+ * returned through the client as an atomic operation. This is an abstract class, allowing the user to
+ * implement rowEncoder and rowDecoder such that the columns and values of a given row may be encoded
+ * in a format best suited to the client.
*
* <p>
- * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
- * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
- * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
- * scan after swapping out sources.
+ * For an example implementation, see {@link WholeRowIterator}.
+ *
+ * <p>
+ * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive
+ * first key in a row, (e.g. seek(new Range(new Key(new Text("row")),false,...),...)) this iterator will
+ * skip to the next row. This is done in order to prevent repeated scanning of the same row when system
+ * automatically creates ranges of that form, which happens in the case of the client calling
+ * continueScan, or in the case of the tablet server continuing a scan after swapping out sources.
*
* <p>
- * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
+ * To regain the original key/value pairs of the row, call the rowDecoder function on the key/value
+ * pair that this iterator returned.
*
* @see RowFilter
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2eaf13b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index c8bceea..060a004 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -33,17 +33,22 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
/**
*
- * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
- * pairs into a single key/value pair, which is returned through the client as an atomic operation.
+ * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic.
+ * It does so by encapsulating an entire row of key/value pairs into a single key/value pair, which
+ * is returned through the client as an atomic operation.
*
* <p>
- * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
- * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
- * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
- * scan after swapping out sources.
+ * This iterator extends the {@link RowEncodingIterator}, providing implementations for rowEncoder
+ * and rowDecoder which serializes all column and value information from a given row into a
+ * single ByteStream in a value.
*
* <p>
- * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
+ * As with the RowEncodingIterator, when seeking in the WholeRowIterator using a range that starts
+ * at a non-inclusive first key in a row, this iterator will skip to the next row.
+ *
+ * <p>
+ * To regain the original key/value pairs of the row, call the decodeRow function on the key/value
+ * pair that this iterator returned.
*
* @see RowFilter
*/