You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@fluo.apache.org by GitBox <gi...@apache.org> on 2017/10/26 14:28:05 UTC

[GitHub] keith-turner closed pull request #952: Issue 946

keith-turner closed pull request #952: Issue 946
URL: https://github.com/apache/fluo/pull/952
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
new file mode 100644
index 00000000..97be335d
--- /dev/null
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/ColumnBuffer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.function.LongPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+
+/**
+ * This class buffers Keys that all have the same row+column.  Internally 
+ * it only stores one Key, a list of timestamps and a list of values.  At iteration 
+ * time it materializes each Key+Value.
+ */
+class ColumnBuffer {
+
+  private Key key;
+  private ArrayList<Long> timeStamps;
+  private ArrayList<byte[]> values;
+
+  public ColumnBuffer() {
+
+    this.key = null;
+    this.timeStamps = new ArrayList<>();
+    this.values = new ArrayList<>();
+  }
+
+  /**
+   * @param timestamp Timestamp to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  private void add(long timestamp, byte[] v) {
+
+    timeStamps.add(timestamp);
+    values.add(v);
+  }
+
+  /**
+   * When empty, the first key added sets the row+column.  After this all keys
+   * added must have the same row+column.
+   *
+   * @param k Key to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  public void add(Key k, byte[] vByte) throws IllegalArgumentException {
+    vByte = Arrays.copyOf(vByte, vByte.length);
+
+    if (key == null) {
+      key = new Key(k);
+      add(k.getTimestamp(), vByte);
+    } else if (key.equals(k, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+      add(k.getTimestamp(), vByte);
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  /**
+   * When empty, the first key added sets the row+column.  After this all keys
+   * added must have the same row+column.
+   *
+   * @param k Key to be added to buffer
+   * @param v Value to be added to buffer
+   */
+  public void add(Key k, Value v) throws IllegalArgumentException {
+    add(k, v.get());
+  }
+
+  /**
+   * Clears the dest ColumnBuffer and inserts all entries in dest where the timestamp passes 
+   * the timestampTest.
+   *
+   * @param dest Destination ColumnBuffer
+   * @param timestampTest Test to determine which timestamps get added to dest
+   */
+  public void copyTo(ColumnBuffer dest, LongPredicate timestampTest) {
+    dest.clear();
+
+    if (key != null) {
+      dest.key = new Key(key);
+    }
+
+    for (int i = 0; i < timeStamps.size(); i++) {
+      long time = timeStamps.get(i);
+      if (timestampTest.test(time)) {
+        dest.add(time, values.get(i));
+      }
+    }
+  }
+
+  public void clear() {
+    timeStamps.clear();
+    values.clear();
+    key = null;
+  }
+
+  /**
+   * @return the size of the current buffer
+   */
+  public int size() {
+    return timeStamps.size();
+  }
+
+  /**
+   * @param pos Position of the Key that will be retrieved 
+   * @return The key at a given position
+   */
+  public Key getKey(int pos) {
+    Key tmpKey = new Key(key);
+    tmpKey.setTimestamp(timeStamps.get(pos));
+    return tmpKey;
+  }
+
+  /**
+   * @param pos Position of the Value that will be retrieved
+   * @return The value at a given position
+   */
+  public Value getValue(int pos) {
+    return new Value(values.get(pos));
+  }
+}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index 3d3ee0e6..9ca23e9f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.function.LongPredicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -44,18 +45,6 @@
  */
 public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Value> {
 
-  private static class KeyValue extends SimpleImmutableEntry<Key, Value> {
-    private static final long serialVersionUID = 1L;
-
-    public KeyValue(Key key, Value value) {
-      super(new Key(key), new Value(value));
-    }
-
-    public KeyValue(Key key, byte[] value) {
-      super(new Key(key), new Value(value));
-    }
-  }
-
   @VisibleForTesting
   static final String GC_TIMESTAMP_OPT = "timestamp.gc";
 
@@ -65,8 +54,8 @@ public KeyValue(Key key, byte[] value) {
   private Long gcTimestamp;
   private SortedKeyValueIterator<Key, Value> source;
 
-  private ArrayList<KeyValue> keys = new ArrayList<>();
-  private ArrayList<KeyValue> keysFiltered = new ArrayList<>();
+  private ColumnBuffer keys = new ColumnBuffer();
+  private ColumnBuffer keysFiltered = new ColumnBuffer();
   private HashSet<Long> completeTxs = new HashSet<>();
   private HashSet<Long> rolledback = new HashSet<>();
   private Key curCol = new Key();
@@ -77,11 +66,11 @@ public KeyValue(Key key, byte[] value) {
   @Override
   public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
       IteratorEnvironment env) throws IOException {
+
     if (env.getIteratorScope() == IteratorScope.scan) {
       throw new IllegalArgumentException();
     }
     this.source = source;
-
     isFullMajc = env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction();
 
     String oats = options.get(GC_TIMESTAMP_OPT);
@@ -96,6 +85,7 @@ public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String>
     }
   }
 
+
   @Override
   public boolean hasTop() {
     return position < keysFiltered.size() || source.hasTop();
@@ -191,7 +181,7 @@ private void readColMetadata() throws IOException {
       long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
 
       if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+        keys.add(source.getTopKey(), source.getTopValue());
         completeTxs.add(ts);
       } else if (colType == ColumnConstants.WRITE_PREFIX) {
         boolean keep = false;
@@ -224,7 +214,7 @@ private void readColMetadata() throws IOException {
         }
 
         if (keep) {
-          keys.add(new KeyValue(source.getTopKey(), val));
+          keys.add(source.getTopKey(), val);
         } else if (complete) {
           completeTxs.remove(ts);
         }
@@ -249,13 +239,13 @@ private void readColMetadata() throws IOException {
         }
 
         if (keep) {
-          keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+          keys.add(source.getTopKey(), source.getTopValue());
         } else if (complete) {
           completeTxs.remove(txDoneTs);
         }
       } else if (colType == ColumnConstants.LOCK_PREFIX) {
         if (ts > invalidationTime) {
-          keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+          keys.add(source.getTopKey(), source.getTopValue());
         }
       } else if (colType == ColumnConstants.DATA_PREFIX) {
         // can stop looking
@@ -263,7 +253,7 @@ private void readColMetadata() throws IOException {
       } else if (colType == ColumnConstants.ACK_PREFIX) {
         if (!sawAck) {
           if (ts >= firstWrite) {
-            keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
+            keys.add(source.getTopKey(), source.getTopValue());
           }
           sawAck = true;
         }
@@ -274,22 +264,20 @@ private void readColMetadata() throws IOException {
       source.next();
     }
 
-    for (KeyValue kv : keys) {
-      long colType = kv.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
+    keys.copyTo(keysFiltered, (timestamp -> {
+      long colType = timestamp & ColumnConstants.PREFIX_MASK;
       if (colType == ColumnConstants.TX_DONE_PREFIX) {
-        if (completeTxs.contains(kv.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK)) {
-          keysFiltered.add(kv);
-        }
+        return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
       } else {
-        keysFiltered.add(kv);
+        return true;
       }
-    }
+    }));
   }
 
   @Override
   public Key getTopKey() {
     if (position < keysFiltered.size()) {
-      return keysFiltered.get(position).getKey();
+      return keysFiltered.getKey(position);
     } else {
       return source.getTopKey();
     }
@@ -298,7 +286,7 @@ public Key getTopKey() {
   @Override
   public Value getTopValue() {
     if (position < keysFiltered.size()) {
-      return keysFiltered.get(position).getValue();
+      return keysFiltered.getValue(position);
     } else {
       return source.getTopValue();
     }
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
new file mode 100644
index 00000000..8dd797fe
--- /dev/null
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/ColumnBufferTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.accumulo.iterators;
+
+import java.lang.IllegalArgumentException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnBufferTest {
+
+  @Test
+  public void testDifferentKeys() {
+    ColumnBuffer columnBuffer = new ColumnBuffer();
+    columnBuffer.add(new Key("row1"), new Value());
+    try {
+      columnBuffer.add(new Key("row2"), new Value());
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+
+    }
+  }
+}
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
index a1600d86..21ee725f 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIteratorTest.java
@@ -39,9 +39,11 @@ GarbageCollectionIterator newGCI(TestData input, long oldestActive, boolean full
 
     try {
       gci.init(new SortedMapIterator(input.data), options, env);
+
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+
     return gci;
   }
 
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index 2ea42b3b..2aaa624b 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -45,6 +45,7 @@
   }
 
   TestData(SortedKeyValueIterator<Key, Value> iter, Range range) {
+
     try {
       iter.seek(range, new HashSet<ByteSequence>(), false);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services