You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2017/10/26 14:25:33 UTC

[fluo] branch master updated: fixes #946 Added mem efficient col buffer for GCiter (#952)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 51dc912  fixes #946 Added mem efficient col buffer for GCiter (#952)
51dc912 is described below

commit 51dc9123210998c0548f8177ca5efc57ecce405a
Author: Joseph Koshakow <ko...@gmail.com>
AuthorDate: Thu Oct 26 10:22:11 2017 -0400

    fixes #946 Added mem efficient col buffer for GCiter (#952)
---
 .../fluo/accumulo/iterators/ColumnBuffer.java      | 138 +++++++++++++++++++++
 .../iterators/GarbageCollectionIterator.java       |  46 +++----
 .../fluo/accumulo/iterators/ColumnBufferTest.java  |  38 ++++++
 3 files changed, 193 insertions(+), 29 deletions(-)

diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
new file mode 100644
index 0000000..97be335
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.function.LongPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+
+/**
+ * This class buffers Keys that all have the same row+column.  Internally 
+ * it only stores one Key, a list of timestamps and a list of values.  At iteration 
+ * time it materializes each Key+Value.
+ */
+class ColumnBuffer {
+
+  private Key key;
+  private ArrayList<Long> timeStamps;
+  private ArrayList<byte[]> values;
+
+  public ColumnBuffer() {
+
+    this.key = null;
+    this.timeStamps = new ArrayList<>();
+    this.values = new ArrayList<>();
+  }
+
+  /**
+   * @param timestamp Timestamp to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  private void add(long timestamp, byte[] v) {
+
+    timeStamps.add(timestamp);
+    values.add(v);
+  }
+
+  /**
+   * When empty, the first key added sets the row+column.  After this all keys
+   * added must have the same row+column.
+   *
+   * @param k Key to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  public void add(Key k, byte[] vByte) throws IllegalArgumentException {
+    vByte = Arrays.copyOf(vByte, vByte.length);
+
+    if (key == null) {
+      key = new Key(k);
+      add(k.getTimestamp(), vByte);
+    } else if (key.equals(k, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+      add(k.getTimestamp(), vByte);
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  /**
+   * When empty, the first key added sets the row+column.  After this all keys
+   * added must have the same row+column.
+   *
+   * @param k Key to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  public void add(Key k, Value v) throws IllegalArgumentException {
+    add(k, v.get());
+  }
+
+  /**
+   * Clears the dest ColumnBuffer and inserts all entries in dest where the timestamp passes 
+   * the timestampTest.
+   *
+   * @param dest Destination ColumnBuffer
+   * @param timestampTest Test to determine which timestamps get added to dest
+   */
+  public void copyTo(ColumnBuffer dest, LongPredicate timestampTest) {
+    dest.clear();
+
+    if (key != null) {
+      dest.key = new Key(key);
+    }
+
+    for (int i = 0; i < timeStamps.size(); i++) {
+      long time = timeStamps.get(i);
+      if (timestampTest.test(time)) {
+        dest.add(time, values.get(i));
+      }
+    }
+  }
+
+  public void clear() {
+    timeStamps.clear();
+    values.clear();
+    key = null;
+  }
+
+  /**
+   * @return the size of the current buffer
+   */
+  public int size() {
+    return timeStamps.size();
+  }
+
+  /**
+   * @param pos Position of the Key that will be retrieved 
+   * @return The key at a given position
+   */
+  public Key getKey(int pos) {
+    Key tmpKey = new Key(key);
+    tmpKey.setTimestamp(timeStamps.get(pos));
+    return tmpKey;
+  }
+
+  /**
+   * @param pos Position of the Value that will be retrieved
+   * @return The value at a given position
+   */
+  public Value getValue(int pos) {
+    return new Value(values.get(pos));
+  }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 3d3ee0e..9ca23e9 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.function.LongPredicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -44,18 +45,6 @@ import org.apache.fluo.accumulo.values.WriteValue;
  */
 public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Value> {
 
-  private static class KeyValue extends SimpleImmutableEntry<Key, Value> {
-    private static final long serialVersionUID = 1L;
-
-    public KeyValue(Key key, Value value) {
-      super(new Key(key), new Value(value));
-    }
-
-    public KeyValue(Key key, byte[] value) {
-      super(new Key(key), new Value(value));
-    }
-  }
-
   @VisibleForTesting
   static final String GC_TIMESTAMP_OPT = "timestamp.gc";
 
@@ -65,8 +54,8 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
   private Long gcTimestamp;
   private SortedKeyValueIterator<Key, Value> source;
 
-  private ArrayList<KeyValue> keys = new ArrayList<>();
-  private ArrayList<KeyValue> keysFiltered = new ArrayList<>();
+  private ColumnBuffer keys = new ColumnBuffer();
+  private ColumnBuffer keysFiltered = new ColumnBuffer();
   private HashSet<Long> completeTxs = new HashSet<>();
   private HashSet<Long> rolledback = new HashSet<>();
   private Key curCol = new Key();
@@ -77,11 +66,11 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
   @Override
   public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
       IteratorEnvironment env) throws IOException {
+
     if (env.getIteratorScope() == IteratorScope.scan) {
       throw new IllegalArgumentException();
     }
     this.source = source;
-
     isFullMajc = env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction();
 
     String oats = options.get(GC_TIMESTAMP_OPT);
@@ -96,6 +85,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
     }
   }
 
+
   @Override
   public boolean hasTop() {
     return position < keysFiltered.size() || source.hasTop();
@@ -191,7 +181,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
       if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+        keys.add(source.getTopKey(), source.getTopValue());
         completeTxs.add(ts);
       } else if (colType == ColumnConstants.WRITE_PREFIX) {
         boolean keep = false;
@@ -224,7 +214,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
         }
 
         if (keep) {
-          keys.add(new KeyValue(source.getTopKey(), val));
+          keys.add(source.getTopKey(), val);
         } else if (complete) {
           completeTxs.remove(ts);
         }
@@ -249,13 +239,13 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
         }
 
         if (keep) {
-          keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+          keys.add(source.getTopKey(), source.getTopValue());
         } else if (complete) {
           completeTxs.remove(txDoneTs);
         }
       } else if (colType == ColumnConstants.LOCK_PREFIX) {
         if (ts > invalidationTime) {
-          keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+          keys.add(source.getTopKey(), source.getTopValue());
         }
       } else if (colType == ColumnConstants.DATA_PREFIX) {
         // can stop looking
@@ -263,7 +253,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
       } else if (colType == ColumnConstants.ACK_PREFIX) {
         if (!sawAck) {
           if (ts >= firstWrite) {
-            keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+            keys.add(source.getTopKey(), source.getTopValue());
           }
           sawAck = true;
         }
@@ -274,22 +264,20 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
       source.next();
     }
 
-    for (KeyValue kv : keys) {
-      long colType = kv.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+    keys.copyTo(keysFiltered, (timestamp -> {
+      long colType = timestamp & ColumnConstants.PREFIX_MASK;
       if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        if (completeTxs.contains(kv.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK)) {
-          keysFiltered.add(kv);
-        }
+        return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
       } else {
-        keysFiltered.add(kv);
+        return true;
       }
-    }
+    }));
   }
 
   @Override
   public Key getTopKey() {
     if (position < keysFiltered.size()) {
-      return keysFiltered.get(position).getKey();
+      return keysFiltered.getKey(position);
     } else {
       return source.getTopKey();
     }
@@ -298,7 +286,7 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
   @Override
   public Value getTopValue() {
     if (position < keysFiltered.size()) {
-      return keysFiltered.get(position).getValue();
+      return keysFiltered.getValue(position);
     } else {
       return source.getTopValue();
     }
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
new file mode 100644
index 0000000..8dd797f
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnBufferTest {
+
+  @Test
+  public void testDifferentKeys() {
+    ColumnBuffer columnBuffer = new ColumnBuffer();
+    columnBuffer.add(new Key("row1"), new Value());
+    try {
+      columnBuffer.add(new Key("row2"), new Value());
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <co...@fluo.apache.org>'].