You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by cj...@apache.org on 2015/01/08 05:40:44 UTC
[2/4] accumulo git commit: ACCUMULO-2641 adds scan id as optional
thrift parameter so that getActiveScans(tserver) getScanid() returns unique
value with MAC test.
ACCUMULO-2641 adds scan id as optional thrift parameter so that getActiveScans(tserver) getScanid() returns unique value with MAC test.
Signed-off-by: Corey J. Nolet <cj...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ade42586
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ade42586
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ade42586
Branch: refs/heads/master
Commit: ade425862446e358a051190bac9cdc0bc2e80679
Parents: 34dda79
Author: Ed Coleman <de...@etcoleman.com>
Authored: Sat Jan 3 14:01:11 2015 -0500
Committer: Corey J. Nolet <cj...@gmail.com>
Committed: Wed Jan 7 23:08:48 2015 -0500
----------------------------------------------------------------------
.../core/client/impl/ActiveScanImpl.java | 5 +-
core/src/main/thrift/tabletserver.thrift | 1 +
.../apache/accumulo/tserver/TabletServer.java | 8 +-
.../accumulo/test/functional/ScanIdIT.java | 360 +++++++++++++++++++
.../org/apache/accumulo/trace/thrift/TInfo.java | 10 +-
5 files changed, 375 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
index 4f6fa33..0f0e64c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ActiveScanImpl.java
@@ -37,7 +37,7 @@ import org.apache.accumulo.core.security.Authorizations;
*/
public class ActiveScanImpl extends ActiveScan {
- private long scanid;
+ private long scanId;
private String client;
private String table;
private long age;
@@ -52,6 +52,7 @@ public class ActiveScanImpl extends ActiveScan {
private Authorizations authorizations;
ActiveScanImpl(Instance instance, org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan) throws TableNotFoundException {
+ this.scanId = activeScan.scanId;
this.client = activeScan.client;
this.user = activeScan.user;
this.age = activeScan.age;
@@ -76,7 +77,7 @@ public class ActiveScanImpl extends ActiveScan {
@Override
public long getScanid() {
- return scanid;
+ return scanId;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 25e0b10..512f0c0 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -85,6 +85,7 @@ struct ActiveScan {
11:list<data.IterInfo> ssiList
12:map<string, map<string, string>> ssio /* Server Side Iterator Options */
13:list<binary> authorizations
+ 14:optional i64 scanId
}
enum CompactionType {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e6fb417..ec6d9b3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -605,8 +605,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
- activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
- state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+ ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB());
+
+ // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
+ activeScan.setScanId(entry.getKey());
+ activeScans.add(activeScan);
} else if (session instanceof MultiScanSession) {
MultiScanSession mss = (MultiScanSession) session;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
new file mode 100644
index 0000000..178cb30
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
+ * returns a unique scan id.<p>
+ * <p/>
+ * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions.
+ * The test exercises multiple tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers
+ * for completeness.
+ * <p/>
+ * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added:
+ * <p/>
+ * private static final long serialVersionUID = -4659975753252858243l;
+ * <p/>
+ * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
+ */
+public class ScanIdIT extends AccumuloClusterIT {
+
+ private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
+
+ private static final int NUM_SCANNERS = 8;
+
+ private static final int NUM_DATA_ROWS = 100;
+
+ private static final Random random = new Random();
+
+ private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
+
+ private static volatile boolean testInProgress = true;
+
+ private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ /**
+ * @throws Exception any exception is a test failure.
+ */
+ @Test
+ public void testScanId() throws Exception {
+
+ final String tableName = getUniqueNames(1)[0];
+ Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+
+ addSplits(conn, tableName);
+
+ generateSampleData(conn, tableName);
+
+ attachSlowIterator(conn, tableName);
+
+ for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+ ScannerThread st = new ScannerThread(conn, scannerIndex, tableName);
+ pool.submit(st);
+ }
+
+ // wait for scanners to report a result.
+ while (testInProgress) {
+
+ if (resultsByWorker.size() < NUM_SCANNERS) {
+ log.trace("Results reported {}", resultsByWorker.size());
+ UtilWaitThread.sleep(750);
+ } else {
+ // each worker has reported at least one result.
+ testInProgress = false;
+
+ log.debug("Final result count {}", resultsByWorker.size());
+
+ // delay to allow scanners to react to end of test and cleanly close.
+ UtilWaitThread.sleep(1000);
+ }
+
+ }
+
+ // all scanner have reported at least 1 result, so check for unique scan ids.
+ Set<Long> scanIds = new HashSet<Long>();
+
+ List<String> tservers = conn.instanceOperations().getTabletServers();
+
+ log.debug("tablet servers {}", tservers.toString());
+
+ for (String tserver : tservers) {
+
+ List<ActiveScan> activeScans = conn.instanceOperations().getActiveScans(tserver);
+
+ log.debug("TServer {} has {} active scans", tserver, activeScans.size());
+
+ for (ActiveScan scan : activeScans) {
+ log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
+ scanIds.add(scan.getScanid());
+ }
+ }
+
+ assertTrue(NUM_SCANNERS <= scanIds.size());
+
+ }
+
+ /**
+ * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
+ * <p/>
+ * The thread run method is terminated when the testInProgress flag is set to false.
+ */
+ private static class ScannerThread implements Runnable {
+
+ private final Connector connector;
+ private Scanner scanner = null;
+ private final int workerIndex;
+ private final String tablename;
+
+ public ScannerThread(final Connector connector, final int workerIndex, final String tablename) {
+
+ this.connector = connector;
+ this.workerIndex = workerIndex;
+ this.tablename = tablename;
+
+ }
+
+ /**
+ * execute the scan across the sample data and put scan result into result map until
+ * testInProgress flag is set to false.
+ */
+ @Override public void run() {
+
+ /*
+ * set random initial delay of up to to
+ * allow scanners to proceed to different points.
+ */
+
+ long delay = random.nextInt(5000);
+
+ log.trace("Start delay for worker thread {} is {}", workerIndex, delay);
+
+ UtilWaitThread.sleep(delay);
+
+ try {
+
+ scanner = connector.createScanner(tablename, new Authorizations());
+
+ // Never start readahead
+ scanner.setReadaheadThreshold(Long.MAX_VALUE);
+ scanner.setBatchSize(1);
+
+ // create different ranges to try to hit more than one tablet.
+ scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
+
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException("Initialization failure. Could not create scanner", e);
+ }
+
+ scanner.fetchColumnFamily(new Text("fam1"));
+
+ for (Map.Entry<Key,Value> entry : scanner) {
+
+ // exit when success condition is met.
+ if (!testInProgress) {
+ scanner.clearScanIterators();
+ scanner.close();
+
+ return;
+ }
+
+ Text row = entry.getKey().getRow();
+
+ log.trace("worker {}, row {}", workerIndex, row.toString());
+
+ if (entry.getValue() != null) {
+
+ Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
+
+ // value should always being increasing
+ if (prevValue != null) {
+
+ log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue()));
+
+ assertTrue(prevValue.compareTo(entry.getValue()) > 0);
+ }
+ } else {
+ log.info("Scanner returned null");
+ fail("Scanner returned unexpected null value");
+ }
+
+ }
+
+ log.debug("Scanner ran out of data. (info only, not an error) ");
+
+ }
+ }
+
+ /**
+ * Create splits on table and force migration by taking table offline and then bring back
+ * online for test.
+ *
+ * @param conn Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void addSplits(final Connector conn, final String tableName) {
+
+ SortedSet<Text> splits = createSplits();
+
+ try {
+
+ conn.tableOperations().addSplits(tableName, splits);
+
+ conn.tableOperations().offline(tableName, true);
+
+ UtilWaitThread.sleep(2000);
+ conn.tableOperations().online(tableName, true);
+
+ for (Text split : conn.tableOperations().listSplits(tableName)) {
+ log.trace("Split {}", split);
+ }
+
+ } catch (AccumuloSecurityException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ } catch (AccumuloException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ }
+
+ }
+
+ /**
+ * Create splits to distribute data across multiple tservers.
+ *
+ * @return splits in sorted set for addSplits.
+ */
+ private SortedSet<Text> createSplits() {
+
+ SortedSet<Text> splits = new TreeSet<Text>();
+
+ for (int split = 0; split < 10; split++) {
+ splits.add(new Text(Integer.toString(split)));
+ }
+
+ return splits;
+ }
+
+ /**
+ * Generate some sample data using random row id to distribute across splits.
+ * <p/>
+ * The primary goal is to determine that each scanner is assigned a unique scan id.
+ * This test does check that the count value for fam1 increases if a scanner reads multiple value, but this is
+ * secondary consideration for this test, that is included for completeness.
+ *
+ * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void generateSampleData(Connector connector, final String tablename) {
+
+ try {
+
+ BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
+
+ ColumnVisibility vis = new ColumnVisibility("public");
+
+ for (int i = 0; i < NUM_DATA_ROWS; i++) {
+
+ Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
+
+ Mutation m = new Mutation(rowId);
+ m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
+ m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8)));
+ m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8)));
+
+ log.trace("Added row {}", rowId);
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+ } catch (MutationsRejectedException ex) {
+ throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+ }
+ }
+
+ /**
+ * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a
+ * fairly large sleep and delay times because we are not concerned with how much data is read and we do not read
+ * all of the data - the test stops once each scanner reports a scan id.
+ *
+ * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void attachSlowIterator(Connector connector, final String tablename) {
+ try {
+
+ IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
+ slowIter.addOption("sleepTime", "200");
+ slowIter.addOption("seekSleepTime", "200");
+
+ connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
+
+ } catch (AccumuloException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ } catch (AccumuloSecurityException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ade42586/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
----------------------------------------------------------------------
diff --git a/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java b/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
index 1046149..6549aa2 100644
--- a/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
+++ b/trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
@@ -49,11 +49,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("all") public class TInfo implements org.apache.thrift.TBase<TInfo, TInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TInfo> {
-
- //ACCUMULO-3132
- //Total hack around the serialization of TInfo into zookeeper
- private static final long serialVersionUID = -4659975753252858243l;
-
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TInfo");
private static final org.apache.thrift.protocol.TField TRACE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("traceId", org.apache.thrift.protocol.TType.I64, (short)1);
@@ -68,6 +63,11 @@ import org.slf4j.LoggerFactory;
public long traceId; // required
public long parentId; // required
+ // ACCUMULO-2641 modified thrift by adding optional scanId which changes this signature
+ // can be removed once org.apache.accumulo.master.state.TraceRepoDeserializationTest
+ // data is regenerated.
+ private static final long serialVersionUID = -4659975753252858243L;
+
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
TRACE_ID((short)1, "traceId"),