You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by cj...@apache.org on 2015/01/08 05:40:44 UTC

[2/4] accumulo git commit: ACCUMULO-2641 adds scan id as optional thrift parameter so that getActiveScans(tserver) getScanid() returns unique value with MAC test.

ACCUMULO-2641 adds scan id as optional thrift parameter so that getActiveScans(tserver) getScanid() returns unique value with MAC test.

Signed-off-by: Corey J. Nolet <cj...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ade42586
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ade42586
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ade42586

Branch: refs/heads/master
Commit: ade425862446e358a051190bac9cdc0bc2e80679
Parents: 34dda79
Author: Ed Coleman <de...@etcoleman.com>
Authored: Sat Jan 3 14:01:11 2015 -0500
Committer: Corey J. Nolet <cj...@gmail.com>
Committed: Wed Jan 7 23:08:48 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ActiveScanImpl.java        |   5 +-
 core/src/main/thrift/tabletserver.thrift        |   1 +
 .../apache/accumulo/tserver/TabletServer.java   |   8 +-
 .../accumulo/test/functional/ScanIdIT.java      | 360 +++++++++++++++++++
 .../org/apache/accumulo/trace/thrift/TInfo.java |  10 +-
 5 files changed, 375 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
index 4f6fa33..0f0e64c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
@@ -37,7 +37,7 @@ import org.apache.accumulo.core.security.Authorizations;
  */
 public class ActiveScanImpl extends ActiveScan {
 
-  private long scanid;
+  private long scanId;
   private String client;
   private String table;
   private long age;
@@ -52,6 +52,7 @@ public class ActiveScanImpl extends ActiveScan {
   private Authorizations authorizations;
 
   ActiveScanImpl(Instance instance, org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan) throws TableNotFoundException {
+    this.scanId = activeScan.scanId;
     this.client = activeScan.client;
     this.user = activeScan.user;
     this.age = activeScan.age;
@@ -76,7 +77,7 @@ public class ActiveScanImpl extends ActiveScan {
 
   @Override
   public long getScanid() {
-    return scanid;
+    return scanId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 25e0b10..512f0c0 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -85,6 +85,7 @@ struct ActiveScan {
     11:list<data.IterInfo> ssiList
     12:map<string, map<string, string>> ssio  /* Server Side Iterator Options */
     13:list<binary> authorizations
+    14:optional i64 scanId
 }
 
 enum CompactionType {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e6fb417..ec6d9b3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -605,8 +605,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
             }
           }
 
-          activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
-              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+          ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB());
+
+          // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
+          activeScan.setScanId(entry.getKey());
+          activeScans.add(activeScan);
 
         } else if (session instanceof MultiScanSession) {
           MultiScanSession mss = (MultiScanSession) session;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
new file mode 100644
index 0000000..178cb30
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
+ * returns a unique scan id.<p>
+ * <p/>
+ * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions.
+ * The test exercises multiple tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers
+ * for completeness.
+ * <p/>
+ * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added:
+ * <p/>
+ * private static final long serialVersionUID = -4659975753252858243l;
+ * <p/>
+ * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
+ */
+public class ScanIdIT extends AccumuloClusterIT {
+
+  private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
+
+  private static final int NUM_SCANNERS = 8;
+
+  private static final int NUM_DATA_ROWS = 100;
+
+  private static final Random random = new Random();
+
+  private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
+
+  private static volatile boolean testInProgress = true;
+
+  private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  /**
+   * @throws Exception any exception is a test failure.
+   */
+  @Test
+  public void testScanId() throws Exception {
+
+    final String tableName = getUniqueNames(1)[0];
+    Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+
+    addSplits(conn, tableName);
+
+    generateSampleData(conn, tableName);
+
+    attachSlowIterator(conn, tableName);
+
+    for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+      ScannerThread st = new ScannerThread(conn, scannerIndex, tableName);
+      pool.submit(st);
+    }
+
+    // wait for scanners to report a result.
+    while (testInProgress) {
+
+      if (resultsByWorker.size() < NUM_SCANNERS) {
+        log.trace("Results reported {}", resultsByWorker.size());
+        UtilWaitThread.sleep(750);
+      } else {
+        // each worker has reported at least one result.
+        testInProgress = false;
+
+        log.debug("Final result count {}", resultsByWorker.size());
+
+        // delay to allow scanners to react to end of test and cleanly close.
+        UtilWaitThread.sleep(1000);
+      }
+
+    }
+
+    // all scanner have reported at least 1 result, so check for unique scan ids.
+    Set<Long> scanIds = new HashSet<Long>();
+
+    List<String> tservers = conn.instanceOperations().getTabletServers();
+
+    log.debug("tablet servers {}", tservers.toString());
+
+    for (String tserver : tservers) {
+
+      List<ActiveScan> activeScans = conn.instanceOperations().getActiveScans(tserver);
+
+      log.debug("TServer {} has {} active scans", tserver, activeScans.size());
+
+      for (ActiveScan scan : activeScans) {
+        log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
+        scanIds.add(scan.getScanid());
+      }
+    }
+
+    assertTrue(NUM_SCANNERS <= scanIds.size());
+
+  }
+
+  /**
+   * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
+   * <p/>
+   * The thread run method is terminated when the testInProgress flag is set to false.
+   */
+  private static class ScannerThread implements Runnable {
+
+    private final Connector connector;
+    private Scanner scanner = null;
+    private final int workerIndex;
+    private final String tablename;
+
+    public ScannerThread(final Connector connector, final int workerIndex, final String tablename) {
+
+      this.connector = connector;
+      this.workerIndex = workerIndex;
+      this.tablename = tablename;
+
+    }
+
+    /**
+     * execute the scan across the sample data and put scan result into result map until
+     * testInProgress flag is set to false.
+     */
+    @Override public void run() {
+
+      /*
+      * set random initial delay of up to to
+      * allow scanners to proceed to different points.
+      */
+
+      long delay = random.nextInt(5000);
+
+      log.trace("Start delay for worker thread {} is {}", workerIndex, delay);
+
+      UtilWaitThread.sleep(delay);
+
+      try {
+
+        scanner = connector.createScanner(tablename, new Authorizations());
+
+        // Never start readahead
+        scanner.setReadaheadThreshold(Long.MAX_VALUE);
+        scanner.setBatchSize(1);
+
+        // create different ranges to try to hit more than one tablet.
+        scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
+
+      } catch (TableNotFoundException e) {
+        throw new IllegalStateException("Initialization failure. Could not create scanner", e);
+      }
+
+      scanner.fetchColumnFamily(new Text("fam1"));
+
+      for (Map.Entry<Key,Value> entry : scanner) {
+
+        // exit when success condition is met.
+        if (!testInProgress) {
+          scanner.clearScanIterators();
+          scanner.close();
+
+          return;
+        }
+
+        Text row = entry.getKey().getRow();
+
+        log.trace("worker {}, row {}", workerIndex, row.toString());
+
+        if (entry.getValue() != null) {
+
+          Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
+
+          // value should always being increasing
+          if (prevValue != null) {
+
+            log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue()));
+
+            assertTrue(prevValue.compareTo(entry.getValue()) > 0);
+          }
+        } else {
+          log.info("Scanner returned null");
+          fail("Scanner returned unexpected null value");
+        }
+
+      }
+
+      log.debug("Scanner ran out of data. (info only, not an error) ");
+
+    }
+  }
+
+  /**
+   * Create splits on table and force migration by taking table offline and then bring back
+   * online for test.
+   *
+   * @param conn Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void addSplits(final Connector conn, final String tableName) {
+
+    SortedSet<Text> splits = createSplits();
+
+    try {
+
+      conn.tableOperations().addSplits(tableName, splits);
+
+      conn.tableOperations().offline(tableName, true);
+
+      UtilWaitThread.sleep(2000);
+      conn.tableOperations().online(tableName, true);
+
+      for (Text split : conn.tableOperations().listSplits(tableName)) {
+        log.trace("Split {}", split);
+      }
+
+    } catch (AccumuloSecurityException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    } catch (AccumuloException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    }
+
+  }
+
+  /**
+   * Create splits to distribute data across multiple tservers.
+   *
+   * @return splits in sorted set for addSplits.
+   */
+  private SortedSet<Text> createSplits() {
+
+    SortedSet<Text> splits = new TreeSet<Text>();
+
+    for (int split = 0; split < 10; split++) {
+      splits.add(new Text(Integer.toString(split)));
+    }
+
+    return splits;
+  }
+
+  /**
+   * Generate some sample data using random row id to distribute across splits.
+   * <p/>
+   * The primary goal is to determine that each scanner is assigned a unique scan id.
+   * This test does check that the count value  for fam1 increases if a scanner reads multiple value, but this is
+   * secondary consideration for this test, that is included for completeness.
+   *
+   * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void generateSampleData(Connector connector, final String tablename) {
+
+    try {
+
+      BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
+
+      ColumnVisibility vis = new ColumnVisibility("public");
+
+      for (int i = 0; i < NUM_DATA_ROWS; i++) {
+
+        Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
+
+        Mutation m = new Mutation(rowId);
+        m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
+        m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8)));
+        m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8)));
+
+        log.trace("Added row {}", rowId);
+
+        bw.addMutation(m);
+      }
+
+      bw.close();
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+    } catch (MutationsRejectedException ex) {
+      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+    }
+  }
+
+  /**
+   * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a
+   * fairly large sleep and delay times because we are not concerned with how much data is read and we do not read
+   * all of the data - the test stops once each scanner reports a scan id.
+   *
+   * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void attachSlowIterator(Connector connector, final String tablename) {
+    try {
+
+      IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
+      slowIter.addOption("sleepTime", "200");
+      slowIter.addOption("seekSleepTime", "200");
+
+      connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
+
+    } catch (AccumuloException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    } catch (AccumuloSecurityException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
----------------------------------------------------------------------
diff --git a/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java b/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
index 1046149..6549aa2 100644
--- a/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
+++ b/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
@@ -49,11 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("all") public class TInfo implements org.apache.thrift.TBase<TInfo, TInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TInfo> {
-
-  //ACCUMULO-3132
-  //Total hack around the serialization of TInfo into zookeeper
-  private static final long serialVersionUID = -4659975753252858243l;
-
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TInfo");
 
   private static final org.apache.thrift.protocol.TField TRACE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("traceId", org.apache.thrift.protocol.TType.I64, (short)1);
@@ -68,6 +63,11 @@ import org.slf4j.LoggerFactory;
   public long traceId; // required
   public long parentId; // required
 
+  // ACCUMULO-2641 modified thrift by adding optional scanId which changes this signature
+  // can be removed once org.apache.accumulo.master.state.TraceRepoDeserializationTest
+  // data is regenerated.
+  private static final long serialVersionUID = -4659975753252858243L;
+
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     TRACE_ID((short)1, "traceId"),