You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2017/10/26 14:25:33 UTC
[fluo] branch master updated: fixes #946 Added mem efficient col
buffer for GCiter (#952)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 51dc912 fixes #946 Added mem efficient col buffer for GCiter (#952)
51dc912 is described below
commit 51dc9123210998c0548f8177ca5efc57ecce405a
Author: Joseph Koshakow <ko...@gmail.com>
AuthorDate: Thu Oct 26 10:22:11 2017 -0400
fixes #946 Added mem efficient col buffer for GCiter (#952)
---
.../fluo/accumulo/iterators/ColumnBuffer.java | 138 +++++++++++++++++++++
.../iterators/GarbageCollectionIterator.java | 46 +++----
.../fluo/accumulo/iterators/ColumnBufferTest.java | 38 ++++++
3 files changed, 193 insertions(+), 29 deletions(-)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
new file mode 100644
index 0000000..97be335
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.function.LongPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+
+/**
+ * This class buffers Keys that all have the same row+column. Internally
+ * it only stores one Key, a list of timestamps and a list of values. At iteration
+ * time it materializes each Key+Value.
+ */
+class ColumnBuffer {
+
+ private Key key;
+ private ArrayList<Long> timeStamps;
+ private ArrayList<byte[]> values;
+
+ public ColumnBuffer() {
+
+ this.key = null;
+ this.timeStamps = new ArrayList<>();
+ this.values = new ArrayList<>();
+ }
+
+ /**
+ * @param timestamp Timestamp to be added to buffer
+ * @param v Value to be added to buffer
+ */
+ private void add(long timestamp, byte[] v) {
+
+ timeStamps.add(timestamp);
+ values.add(v);
+ }
+
+ /**
+ * When empty, the first key added sets the row+column. After this all keys
+ * added must have the same row+column.
+ *
+ * @param k Key to be added to buffer
+ * @param v Value to be added to buffer
+ */
+ public void add(Key k, byte[] vByte) throws IllegalArgumentException {
+ vByte = Arrays.copyOf(vByte, vByte.length);
+
+ if (key == null) {
+ key = new Key(k);
+ add(k.getTimestamp(), vByte);
+ } else if (key.equals(k, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ add(k.getTimestamp(), vByte);
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ /**
+ * When empty, the first key added sets the row+column. After this all keys
+ * added must have the same row+column.
+ *
+ * @param k Key to be added to buffer
+ * @param v Value to be added to buffer
+ */
+ public void add(Key k, Value v) throws IllegalArgumentException {
+ add(k, v.get());
+ }
+
+ /**
+ * Clears the dest ColumnBuffer and inserts all entries in dest where the timestamp passes
+ * the timestampTest.
+ *
+ * @param dest Destination ColumnBuffer
+ * @param timestampTest Test to determine which timestamps get added to dest
+ */
+ public void copyTo(ColumnBuffer dest, LongPredicate timestampTest) {
+ dest.clear();
+
+ if (key != null) {
+ dest.key = new Key(key);
+ }
+
+ for (int i = 0; i < timeStamps.size(); i++) {
+ long time = timeStamps.get(i);
+ if (timestampTest.test(time)) {
+ dest.add(time, values.get(i));
+ }
+ }
+ }
+
+ public void clear() {
+ timeStamps.clear();
+ values.clear();
+ key = null;
+ }
+
+ /**
+ * @return the size of the current buffer
+ */
+ public int size() {
+ return timeStamps.size();
+ }
+
+ /**
+ * @param pos Position of the Key that will be retrieved
+ * @return The key at a given position
+ */
+ public Key getKey(int pos) {
+ Key tmpKey = new Key(key);
+ tmpKey.setTimestamp(timeStamps.get(pos));
+ return tmpKey;
+ }
+
+ /**
+ * @param pos Position of the Value that will be retrieved
+ * @return The value at a given position
+ */
+ public Value getValue(int pos) {
+ return new Value(values.get(pos));
+ }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 3d3ee0e..9ca23e9 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
+import java.util.function.LongPredicate;
import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -44,18 +45,6 @@ import org.apache.fluo.accumulo.values.WriteValue;
*/
public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Value> {
- private static class KeyValue extends SimpleImmutableEntry<Key, Value> {
- private static final long serialVersionUID = 1L;
-
- public KeyValue(Key key, Value value) {
- super(new Key(key), new Value(value));
- }
-
- public KeyValue(Key key, byte[] value) {
- super(new Key(key), new Value(value));
- }
- }
-
@VisibleForTesting
static final String GC_TIMESTAMP_OPT = "timestamp.gc";
@@ -65,8 +54,8 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
private Long gcTimestamp;
private SortedKeyValueIterator<Key, Value> source;
- private ArrayList<KeyValue> keys = new ArrayList<>();
- private ArrayList<KeyValue> keysFiltered = new ArrayList<>();
+ private ColumnBuffer keys = new ColumnBuffer();
+ private ColumnBuffer keysFiltered = new ColumnBuffer();
private HashSet<Long> completeTxs = new HashSet<>();
private HashSet<Long> rolledback = new HashSet<>();
private Key curCol = new Key();
@@ -77,11 +66,11 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException {
+
if (env.getIteratorScope() == IteratorScope.scan) {
throw new IllegalArgumentException();
}
this.source = source;
-
isFullMajc = env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction();
String oats = options.get(GC_TIMESTAMP_OPT);
@@ -96,6 +85,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
}
}
+
@Override
public boolean hasTop() {
return position < keysFiltered.size() || source.hasTop();
@@ -191,7 +181,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
- keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+ keys.add(source.getTopKey(), source.getTopValue());
completeTxs.add(ts);
} else if (colType == ColumnConstants.WRITE_PREFIX) {
boolean keep = false;
@@ -224,7 +214,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
}
if (keep) {
- keys.add(new KeyValue(source.getTopKey(), val));
+ keys.add(source.getTopKey(), val);
} else if (complete) {
completeTxs.remove(ts);
}
@@ -249,13 +239,13 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
}
if (keep) {
- keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+ keys.add(source.getTopKey(), source.getTopValue());
} else if (complete) {
completeTxs.remove(txDoneTs);
}
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
- keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+ keys.add(source.getTopKey(), source.getTopValue());
}
} else if (colType == ColumnConstants.DATA_PREFIX) {
// can stop looking
@@ -263,7 +253,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
} else if (colType == ColumnConstants.ACK_PREFIX) {
if (!sawAck) {
if (ts >= firstWrite) {
- keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+ keys.add(source.getTopKey(), source.getTopValue());
}
sawAck = true;
}
@@ -274,22 +264,20 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
source.next();
}
- for (KeyValue kv : keys) {
- long colType = kv.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+ keys.copyTo(keysFiltered, (timestamp -> {
+ long colType = timestamp & ColumnConstants.PREFIX_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
- if (completeTxs.contains(kv.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK)) {
- keysFiltered.add(kv);
- }
+ return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
} else {
- keysFiltered.add(kv);
+ return true;
}
- }
+ }));
}
@Override
public Key getTopKey() {
if (position < keysFiltered.size()) {
- return keysFiltered.get(position).getKey();
+ return keysFiltered.getKey(position);
} else {
return source.getTopKey();
}
@@ -298,7 +286,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
@Override
public Value getTopValue() {
if (position < keysFiltered.size()) {
- return keysFiltered.get(position).getValue();
+ return keysFiltered.getValue(position);
} else {
return source.getTopValue();
}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
new file mode 100644
index 0000000..8dd797f
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnBufferTest {
+
+ @Test
+ public void testDifferentKeys() {
+ ColumnBuffer columnBuffer = new ColumnBuffer();
+ columnBuffer.add(new Key("row1"), new Value());
+ try {
+ columnBuffer.add(new Key("row2"), new Value());
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <co...@fluo.apache.org>'].