You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/12/05 16:41:05 UTC

incubator-fluo git commit: Added integration test for timestamp skipping iter (TSI). Also made TSI more robust to Accumulo changing in future.

Repository: incubator-fluo
Updated Branches:
  refs/heads/master c896fc16b -> f977834b8


Added integration test for timestamp skipping iter (TSI). Also made TSI more robust to Accumulo changing in future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/f977834b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/f977834b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/f977834b

Branch: refs/heads/master
Commit: f977834b876133528a160b769427439282cb1d7c
Parents: c896fc1
Author: Keith Turner <kt...@apache.org>
Authored: Mon Nov 21 20:57:12 2016 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Nov 22 10:32:39 2016 -0500

----------------------------------------------------------------------
 .../iterators/TimestampSkippingIterator.java    |  34 ++++--
 .../accumulo/Skip100StampsIterator.java         | 106 +++++++++++++++++++
 .../integration/accumulo/TimeskippingIT.java    |  71 +++++++++++++
 3 files changed, 205 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/f977834b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index a09ac3b..ec2a83f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -20,6 +20,7 @@ import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -50,9 +51,12 @@ public class TimestampSkippingIterator implements SortedKeyValueIterator<Key, Va
   private boolean inclusive;
   private boolean hasSeeked = false;
 
+  private boolean removedDeletingIterator = false;
+  private int removalFailures = 0;
+
   private static final Logger log = LoggerFactory.getLogger(TimestampSkippingIterator.class);
 
-  TimestampSkippingIterator(SortedKeyValueIterator<Key, Value> source) {
+  public TimestampSkippingIterator(SortedKeyValueIterator<Key, Value> source) {
     this.source = source;
   }
 
@@ -78,7 +82,7 @@ public class TimestampSkippingIterator implements SortedKeyValueIterator<Key, Va
     while (source.hasTop()
         && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)
         && timestamp < source.getTopKey().getTimestamp()) {
-      if (count == 10) {
+      if (count == 10 && shouldSeek()) {
         // seek to prefix
         Key seekKey = new Key(curCol);
         seekKey.setTimestamp(timestamp);
@@ -148,20 +152,23 @@ public class TimestampSkippingIterator implements SortedKeyValueIterator<Key, Va
     }
   }
 
-  private static void setParent(SortedKeyValueIterator<Key, Value> iter,
+  private static boolean setParent(SortedKeyValueIterator<Key, Value> iter,
       SortedKeyValueIterator<Key, Value> newParent) {
     try {
       if (iter instanceof WrappingIterator) {
         Field field = WrappingIterator.class.getDeclaredField("source");
         field.setAccessible(true);
         field.set(iter, newParent);
+        return true;
       }
     } catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) {
       log.debug(e.getMessage(), e);
     }
+
+    return false;
   }
 
-  private static void removeDeletingIterator(SortedKeyValueIterator<Key, Value> source) {
+  private static boolean removeDeletingIterator(SortedKeyValueIterator<Key, Value> source) {
 
     SortedKeyValueIterator<Key, Value> prev = source;
     SortedKeyValueIterator<Key, Value> parent = getParent(source);
@@ -174,9 +181,21 @@ public class TimestampSkippingIterator implements SortedKeyValueIterator<Key, Va
     if (parent != null && parent instanceof DeletingIterator) {
       SortedKeyValueIterator<Key, Value> delParent = getParent(parent);
       if (delParent != null) {
-        setParent(prev, delParent);
+        return setParent(prev, delParent);
       }
     }
+
+    return false;
+  }
+
+  @VisibleForTesting
+  public final boolean shouldSeek() {
+    /*
+     * This method is a saftey check to ensure the deleting iterator was removed. If this iterator
+     * was not removed for some reason, then the performance of seeking will be O(N^2). In the case
+     * where its not removed, it would be better to just scan forward.
+     */
+    return !hasSeeked || removedDeletingIterator || removalFailures < 3;
   }
 
   private void seek(Range range) throws IOException {
@@ -185,7 +204,10 @@ public class TimestampSkippingIterator implements SortedKeyValueIterator<Key, Va
       // up iterators until the 1st seek. Therefore can only remove the deleting iter after the 1st
       // seek. Also, Accumulo may switch data sources and re-setup the deleting iterator, thats why
       // this iterator keeps trying to remove it.
-      removeDeletingIterator(source);
+      removedDeletingIterator |= removeDeletingIterator(source);
+      if (!removedDeletingIterator) {
+        removalFailures++;
+      }
     }
     source.seek(range, fams, inclusive);
     hasSeeked = true;

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/f977834b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
new file mode 100644
index 0000000..aa8b2ef
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.accumulo;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.fluo.accumulo.iterators.TimestampSkippingIterator;
+
+public class Skip100StampsIterator implements SortedKeyValueIterator<Key, Value> {
+
+  private TimestampSkippingIterator source;
+  private boolean hasTop;
+  private int goodVal;
+  private int goodSeek;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
+      IteratorEnvironment env) throws IOException {
+
+    this.source = new TimestampSkippingIterator(source);
+
+  }
+
+  @Override
+  public boolean hasTop() {
+    return hasTop;
+  }
+
+  @Override
+  public void next() throws IOException {
+    hasTop = false;
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+
+    source.seek(range, columnFamilies, inclusive);
+
+    Key k = new Key("r1", "f1", "q1");
+
+    long ts = 99999;
+    goodVal = 0;
+    hasTop = true;
+
+    /*
+     * If the TimestampSkippingIterator is not able to remove the DeletingIterator, then the
+     * following loop will have O(N^2) performance. This happens because every time the following
+     * loop seeks forward a little bit, the DeletingIterator scans from the start of the column
+     * looking for deletes. I manually commented out the code that removes the DeletingIterator and
+     * the following code took 50 to 100 times longer to run.
+     */
+
+    while (source.hasTop() && ts > 0) {
+      source.skipToTimestamp(k, ts);
+      if (source.hasTop()) {
+        if (source.getTopValue().toString().equals("v" + ts)) {
+          goodVal++;
+        }
+
+        if (source.shouldSeek()) {
+          goodSeek++;
+        }
+      }
+
+      ts -= 100;
+    }
+  }
+
+  @Override
+  public Key getTopKey() {
+    return new Key("r1", "f1", "q1", 42);
+  }
+
+  @Override
+  public Value getTopValue() {
+    return new Value(("" + goodVal + " " + goodSeek).getBytes());
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/f977834b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
new file mode 100644
index 0000000..68bb138
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.accumulo;
+
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.integration.ITBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimeskippingIT extends ITBase {
+
+  private static final Logger log = LoggerFactory.getLogger(TimeskippingIT.class);
+
+  @Test
+  public void testTimestampSkippingIterPerformance() throws Exception {
+
+    conn.tableOperations().create("ttsi", false);
+
+    BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
+    Mutation m = new Mutation("r1");
+    for (int i = 0; i < 100000; i++) {
+      m.put("f1", "q1", i, "v" + i);
+    }
+
+    bw.addMutation(m);
+    bw.close();
+
+    long t2 = System.currentTimeMillis();
+
+    Scanner scanner = conn.createScanner("ttsi", Authorizations.EMPTY);
+    scanner.addScanIterator(new IteratorSetting(10, Skip100StampsIterator.class));
+
+    Assert.assertEquals("999 1000", Iterables.getOnlyElement(scanner).getValue().toString());
+    long t3 = System.currentTimeMillis();
+
+    if (t3 - t2 > 3000) {
+      log.warn("Timestamp skipping iterator took longer than expected " + (t3 - t2));
+    }
+
+    conn.tableOperations().flush("ttsi", null, null, true);
+
+    long t4 = System.currentTimeMillis();
+    Assert.assertEquals("999 1000", Iterables.getOnlyElement(scanner).getValue().toString());
+    long t5 = System.currentTimeMillis();
+
+    if (t5 - t4 > 3000) {
+      log.warn("Timestamp skipping iterator took longer than expected " + (t5 - t4));
+    }
+  }
+}