You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/06 23:07:04 UTC
[1/3] accumulo git commit: ACCUMULO-3761 RowEncodingIterator should
take a maximum buffer size parameter
Repository: accumulo
Updated Branches:
refs/heads/1.7 2938cfad9 -> dfa5255ce
refs/heads/master 46f76023b -> adacc2e5a
ACCUMULO-3761 RowEncodingIterator should take a maximum buffer size parameter
Added the optional parameter maxBufferSize to the RowEncodingIterator. This parameter specifies how large the RowEncodingIterator's buffer can grow to as it encounters key/value pairs to be encoded. The name, encoding and behaviour of the maxBufferSize parameter match the TransformingIterator.
Discussion is here: http://www.mail-archive.com/dev%40accumulo.apache.org/msg09821.html
Added maxBufferSize parameter for RowEncodingIterator based on implementation of TransformingIterator; still needs tests
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfa5255c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfa5255c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfa5255c
Branch: refs/heads/1.7
Commit: dfa5255cee3219cfaf560e949386da23bd9135d6
Parents: 2938cfa
Author: Russ Weeks <rw...@newbrightidea.com>
Authored: Fri Apr 10 00:10:32 2015 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 6 16:43:14 2015 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 56 +++++-
.../core/iterators/user/WholeRowIterator.java | 7 -
.../iterators/user/RowEncodingIteratorTest.java | 201 +++++++++++++++++++
3 files changed, 253 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
index f776569..8a36bef 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -22,14 +22,18 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
+import java.util.HashMap;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.collections.BufferOverflowException;
import org.apache.hadoop.io.Text;
/**
@@ -52,11 +56,15 @@ import org.apache.hadoop.io.Text;
*
* @see RowFilter
*/
-public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value> {
+public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
+ private static final long DEFAULT_MAX_BUFFER_SIZE = Long.MAX_VALUE;
protected SortedKeyValueIterator<Key,Value> sourceIter;
private Key topKey = null;
private Value topValue = null;
+ private long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
// decode a bunch of key value pairs that have been encoded into a single value
/**
@@ -71,12 +79,23 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException;
@Override
- public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ RowEncodingIterator newInstance;
+ try {
+ newInstance = this.getClass().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ newInstance.sourceIter = sourceIter.deepCopy(env);
+ newInstance.maxBufferSize = maxBufferSize;
+ return newInstance;
+ }
List<Key> keys = new ArrayList<Key>();
List<Value> values = new ArrayList<Value>();
private void prepKeys() throws IOException {
+ long kvBufSize = 0;
if (topKey != null)
return;
Text currentRow;
@@ -87,8 +106,14 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
keys.clear();
values.clear();
while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
- keys.add(new Key(sourceIter.getTopKey()));
- values.add(new Value(sourceIter.getTopValue()));
+ Key sourceTopKey = sourceIter.getTopKey();
+ Value sourceTopValue = sourceIter.getTopValue();
+ keys.add(new Key(sourceTopKey));
+ values.add(new Value(sourceTopValue));
+ kvBufSize += sourceTopKey.getSize() + sourceTopValue.getSize() + 128;
+ if (kvBufSize > maxBufferSize) {
+ throw new BufferOverflowException("Exceeded buffer size of " + maxBufferSize + " for row: " + sourceTopKey.getRow().toString());
+ }
sourceIter.next();
}
} while (!filter(currentRow, keys, values));
@@ -129,6 +154,29 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
sourceIter = source;
+ if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
+ maxBufferSize = AccumuloConfiguration.getMemoryInBytes(options.get(MAX_BUFFER_SIZE_OPT));
+ }
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ String desc = "This iterator encapsulates an entire row of Key/Value pairs into a single Key/Value pair.";
+ String bufferDesc = "Maximum buffer size (in accumulo memory spec) to use for buffering keys before throwing a BufferOverflowException.";
+ HashMap<String,String> namedOptions = new HashMap<String,String>();
+ namedOptions.put(MAX_BUFFER_SIZE_OPT, bufferDesc);
+ return new IteratorOptions(getClass().getSimpleName(), desc, namedOptions, null);
+ }
+
+ @Override
+ public boolean validateOptions(Map<String, String> options) {
+ String maxBufferSizeStr = options.get(MAX_BUFFER_SIZE_OPT);
+ try {
+ AccumuloConfiguration.getMemoryInBytes(maxBufferSizeStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse opt " + MAX_BUFFER_SIZE_OPT + " " + maxBufferSizeStr, e);
+ }
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 7c47ec3..9f5a6b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -57,13 +57,6 @@ public class WholeRowIterator extends RowEncodingIterator {
}
@Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- if (sourceIter != null)
- return new WholeRowIterator(sourceIter.deepCopy(env));
- return new WholeRowIterator();
- }
-
- @Override
public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
return decodeRow(rowKey, rowValue);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
new file mode 100644
index 0000000..8f228f5
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.collections.BufferOverflowException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RowEncodingIteratorTest {
+
+ private static final class DummyIteratorEnv implements IteratorEnvironment {
+ @Override
+ public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public AccumuloConfiguration getConfig() {
+ return null;
+ }
+
+ @Override
+ public IteratorUtil.IteratorScope getIteratorScope() {
+ return IteratorUtil.IteratorScope.scan;
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ return false;
+ }
+
+ @Override
+ public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ return null;
+ }
+ }
+
+ private static final class RowEncodingIteratorImpl extends RowEncodingIterator {
+
+ public static SortedMap<Key,Value> decodeRow(Key rowKey, Value rowValue) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(rowValue.get()));
+ int numKeys = dis.readInt();
+ List<Key> decodedKeys = new ArrayList<Key>();
+ List<Value> decodedValues = new ArrayList<Value>();
+ SortedMap<Key,Value> out = new TreeMap<Key,Value>();
+ for (int i = 0; i < numKeys; i++) {
+ Key k = new Key();
+ k.readFields(dis);
+ decodedKeys.add(k);
+ }
+ int numValues = dis.readInt();
+ for (int i = 0; i < numValues; i++) {
+ Value v = new Value();
+ v.readFields(dis);
+ decodedValues.add(v);
+ }
+ if (decodedKeys.size() != decodedValues.size()) {
+ throw new IOException("Number of keys doesn't match number of values");
+ }
+ for (int i = 0; i < decodedKeys.size(); i++) {
+ out.put(decodedKeys.get(i), decodedValues.get(i));
+ }
+ return out;
+ }
+
+ @Override
+ public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
+ return decodeRow(rowKey, rowValue);
+ }
+
+ @Override
+ public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(keys.size());
+ for (Key key : keys) {
+ key.write(dos);
+ }
+ dos.writeInt(values.size());
+ for (Value v : values) {
+ v.write(dos);
+ }
+ dos.flush();
+ return new Value(baos.toByteArray());
+ }
+ }
+
+ private void pkv(SortedMap<Key,Value> map, String row, String cf, String cq, String cv, long ts, byte[] val) {
+ map.put(new Key(new Text(row), new Text(cf), new Text(cq), new Text(cv), ts), new Value(val, true));
+ }
+
+ @Test
+ public void testEncodeAll() throws IOException {
+ byte[] kbVal = new byte[1024];
+ // This code is shamelessly borrowed from the WholeRowIteratorTest.
+ SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+ pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+ pkv(map2, "row2", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map2, "row2", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map3 = new TreeMap<Key,Value>();
+ pkv(map3, "row3", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map3, "row3", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+ map.putAll(map1);
+ map.putAll(map2);
+ map.putAll(map3);
+ SortedMapIterator src = new SortedMapIterator(map);
+ Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+ RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+ Map<String,String> bigBufferOpts = new HashMap<String,String>();
+ bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "3K");
+ iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+ assertTrue(iter.hasTop());
+ assertEquals(map1, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+ // simulate something continuing using the last key from the iterator
+ // this is what client and server code will do
+ range = new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+ assertTrue(iter.hasTop());
+ assertEquals(map2, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+ iter.next();
+
+ assertFalse(iter.hasTop());
+ }
+
+ @Test(expected = BufferOverflowException.class)
+ public void testEncodeSome() throws IOException {
+ byte[] kbVal = new byte[1024];
+ // This code is shamelessly borrowed from the WholeRowIteratorTest.
+ SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+ pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+ map.putAll(map1);
+ SortedMapIterator src = new SortedMapIterator(map);
+ Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+ RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+ Map<String,String> bigBufferOpts = new HashMap<String,String>();
+ bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "1K");
+ iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+ // BufferOverflowException should be thrown as RowEncodingIterator can't fit the whole row into its buffer.
+ }
+}
[2/3] accumulo git commit: ACCUMULO-3761 RowEncodingIterator should
take a maximum buffer size parameter
Posted by el...@apache.org.
ACCUMULO-3761 RowEncodingIterator should take a maximum buffer size parameter
Added the optional parameter maxBufferSize to the RowEncodingIterator. This parameter specifies how large the RowEncodingIterator's buffer can grow to as it encounters key/value pairs to be encoded. The name, encoding and behaviour of the maxBufferSize parameter match the TransformingIterator.
Discussion is here: http://www.mail-archive.com/dev%40accumulo.apache.org/msg09821.html
Added maxBufferSize parameter for RowEncodingIterator based on implementation of TransformingIterator; still needs tests
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfa5255c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfa5255c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfa5255c
Branch: refs/heads/master
Commit: dfa5255cee3219cfaf560e949386da23bd9135d6
Parents: 2938cfa
Author: Russ Weeks <rw...@newbrightidea.com>
Authored: Fri Apr 10 00:10:32 2015 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 6 16:43:14 2015 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 56 +++++-
.../core/iterators/user/WholeRowIterator.java | 7 -
.../iterators/user/RowEncodingIteratorTest.java | 201 +++++++++++++++++++
3 files changed, 253 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
index f776569..8a36bef 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -22,14 +22,18 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
+import java.util.HashMap;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.collections.BufferOverflowException;
import org.apache.hadoop.io.Text;
/**
@@ -52,11 +56,15 @@ import org.apache.hadoop.io.Text;
*
* @see RowFilter
*/
-public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value> {
+public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+
+ public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
+ private static final long DEFAULT_MAX_BUFFER_SIZE = Long.MAX_VALUE;
protected SortedKeyValueIterator<Key,Value> sourceIter;
private Key topKey = null;
private Value topValue = null;
+ private long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
// decode a bunch of key value pairs that have been encoded into a single value
/**
@@ -71,12 +79,23 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException;
@Override
- public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ RowEncodingIterator newInstance;
+ try {
+ newInstance = this.getClass().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ newInstance.sourceIter = sourceIter.deepCopy(env);
+ newInstance.maxBufferSize = maxBufferSize;
+ return newInstance;
+ }
List<Key> keys = new ArrayList<Key>();
List<Value> values = new ArrayList<Value>();
private void prepKeys() throws IOException {
+ long kvBufSize = 0;
if (topKey != null)
return;
Text currentRow;
@@ -87,8 +106,14 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
keys.clear();
values.clear();
while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
- keys.add(new Key(sourceIter.getTopKey()));
- values.add(new Value(sourceIter.getTopValue()));
+ Key sourceTopKey = sourceIter.getTopKey();
+ Value sourceTopValue = sourceIter.getTopValue();
+ keys.add(new Key(sourceTopKey));
+ values.add(new Value(sourceTopValue));
+ kvBufSize += sourceTopKey.getSize() + sourceTopValue.getSize() + 128;
+ if (kvBufSize > maxBufferSize) {
+ throw new BufferOverflowException("Exceeded buffer size of " + maxBufferSize + " for row: " + sourceTopKey.getRow().toString());
+ }
sourceIter.next();
}
} while (!filter(currentRow, keys, values));
@@ -129,6 +154,29 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
sourceIter = source;
+ if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
+ maxBufferSize = AccumuloConfiguration.getMemoryInBytes(options.get(MAX_BUFFER_SIZE_OPT));
+ }
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ String desc = "This iterator encapsulates an entire row of Key/Value pairs into a single Key/Value pair.";
+ String bufferDesc = "Maximum buffer size (in accumulo memory spec) to use for buffering keys before throwing a BufferOverflowException.";
+ HashMap<String,String> namedOptions = new HashMap<String,String>();
+ namedOptions.put(MAX_BUFFER_SIZE_OPT, bufferDesc);
+ return new IteratorOptions(getClass().getSimpleName(), desc, namedOptions, null);
+ }
+
+ @Override
+ public boolean validateOptions(Map<String, String> options) {
+ String maxBufferSizeStr = options.get(MAX_BUFFER_SIZE_OPT);
+ try {
+ AccumuloConfiguration.getMemoryInBytes(maxBufferSizeStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse opt " + MAX_BUFFER_SIZE_OPT + " " + maxBufferSizeStr, e);
+ }
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 7c47ec3..9f5a6b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -57,13 +57,6 @@ public class WholeRowIterator extends RowEncodingIterator {
}
@Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- if (sourceIter != null)
- return new WholeRowIterator(sourceIter.deepCopy(env));
- return new WholeRowIterator();
- }
-
- @Override
public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
return decodeRow(rowKey, rowValue);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
new file mode 100644
index 0000000..8f228f5
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.collections.BufferOverflowException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RowEncodingIteratorTest {
+
+ private static final class DummyIteratorEnv implements IteratorEnvironment {
+ @Override
+ public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public AccumuloConfiguration getConfig() {
+ return null;
+ }
+
+ @Override
+ public IteratorUtil.IteratorScope getIteratorScope() {
+ return IteratorUtil.IteratorScope.scan;
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ return false;
+ }
+
+ @Override
+ public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ return null;
+ }
+ }
+
+ private static final class RowEncodingIteratorImpl extends RowEncodingIterator {
+
+ public static SortedMap<Key,Value> decodeRow(Key rowKey, Value rowValue) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(rowValue.get()));
+ int numKeys = dis.readInt();
+ List<Key> decodedKeys = new ArrayList<Key>();
+ List<Value> decodedValues = new ArrayList<Value>();
+ SortedMap<Key,Value> out = new TreeMap<Key,Value>();
+ for (int i = 0; i < numKeys; i++) {
+ Key k = new Key();
+ k.readFields(dis);
+ decodedKeys.add(k);
+ }
+ int numValues = dis.readInt();
+ for (int i = 0; i < numValues; i++) {
+ Value v = new Value();
+ v.readFields(dis);
+ decodedValues.add(v);
+ }
+ if (decodedKeys.size() != decodedValues.size()) {
+ throw new IOException("Number of keys doesn't match number of values");
+ }
+ for (int i = 0; i < decodedKeys.size(); i++) {
+ out.put(decodedKeys.get(i), decodedValues.get(i));
+ }
+ return out;
+ }
+
+ @Override
+ public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException {
+ return decodeRow(rowKey, rowValue);
+ }
+
+ @Override
+ public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(keys.size());
+ for (Key key : keys) {
+ key.write(dos);
+ }
+ dos.writeInt(values.size());
+ for (Value v : values) {
+ v.write(dos);
+ }
+ dos.flush();
+ return new Value(baos.toByteArray());
+ }
+ }
+
+ private void pkv(SortedMap<Key,Value> map, String row, String cf, String cq, String cv, long ts, byte[] val) {
+ map.put(new Key(new Text(row), new Text(cf), new Text(cq), new Text(cv), ts), new Value(val, true));
+ }
+
+ @Test
+ public void testEncodeAll() throws IOException {
+ byte[] kbVal = new byte[1024];
+ // This code is shamelessly borrowed from the WholeRowIteratorTest.
+ SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+ pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+ pkv(map2, "row2", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map2, "row2", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map3 = new TreeMap<Key,Value>();
+ pkv(map3, "row3", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map3, "row3", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+ map.putAll(map1);
+ map.putAll(map2);
+ map.putAll(map3);
+ SortedMapIterator src = new SortedMapIterator(map);
+ Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+ RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+ Map<String,String> bigBufferOpts = new HashMap<String,String>();
+ bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "3K");
+ iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+ assertTrue(iter.hasTop());
+ assertEquals(map1, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+ // simulate something continuing using the last key from the iterator
+ // this is what client and server code will do
+ range = new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+ assertTrue(iter.hasTop());
+ assertEquals(map2, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+ iter.next();
+
+ assertFalse(iter.hasTop());
+ }
+
+ @Test(expected = BufferOverflowException.class)
+ public void testEncodeSome() throws IOException {
+ byte[] kbVal = new byte[1024];
+ // This code is shamelessly borrowed from the WholeRowIteratorTest.
+ SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+ pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+ pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+ SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+ map.putAll(map1);
+ SortedMapIterator src = new SortedMapIterator(map);
+ Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+ RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+ Map<String,String> bigBufferOpts = new HashMap<String,String>();
+ bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "1K");
+ iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+ iter.seek(range, new ArrayList<ByteSequence>(), false);
+ // BufferOverflowException should be thrown as RowEncodingIterator can't fit the whole row into its buffer.
+ }
+}
[3/3] accumulo git commit: Merge branch '1.7'
Posted by el...@apache.org.
Merge branch '1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/adacc2e5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/adacc2e5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/adacc2e5
Branch: refs/heads/master
Commit: adacc2e5a096d0c463e3027d386b945e5cc0cfaa
Parents: 46f7602 dfa5255
Author: Josh Elser <el...@apache.org>
Authored: Wed May 6 17:06:26 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 6 17:06:26 2015 -0400
----------------------------------------------------------------------
.../iterators/user/RowEncodingIterator.java | 56 +++++-
.../core/iterators/user/WholeRowIterator.java | 7 -
.../iterators/user/RowEncodingIteratorTest.java | 201 +++++++++++++++++++
3 files changed, 253 insertions(+), 11 deletions(-)
----------------------------------------------------------------------