You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by cj...@apache.org on 2015/01/08 05:40:46 UTC
[4/4] accumulo git commit: Merge branch '1.6'
Merge branch '1.6'
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/44e2b2cd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/44e2b2cd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/44e2b2cd
Branch: refs/heads/master
Commit: 44e2b2cdf89b26a94540e653386bbe9ab269296f
Parents: 44ad36c 3bf4666
Author: Corey J. Nolet <cj...@gmail.com>
Authored: Wed Jan 7 23:40:33 2015 -0500
Committer: Corey J. Nolet <cj...@gmail.com>
Committed: Wed Jan 7 23:40:33 2015 -0500
----------------------------------------------------------------------
.../core/client/impl/ActiveScanImpl.java | 5 +-
.../core/tabletserver/thrift/ActiveScan.java | 102 +++++-
core/src/main/thrift/tabletserver.thrift | 1 +
.../tserver/session/SessionManager.java | 8 +-
.../accumulo/test/functional/ScanIdIT.java | 360 +++++++++++++++++++
5 files changed, 469 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index c9445c6,0000000..13049e2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -1,313 -1,0 +1,317 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TimerTask;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.scan.ScanRunState;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.log4j.Logger;
+
+public class SessionManager {
+ private static final Logger log = Logger.getLogger(SessionManager.class);
+
+ private final SecureRandom random = new SecureRandom();
+ private final Map<Long,Session> sessions = new HashMap<Long,Session>();
+ private final long maxIdle;
+ private final AccumuloConfiguration aconf;
+
+ public SessionManager(AccumuloConfiguration conf) {
+ aconf = conf;
+ maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ sweep(maxIdle);
+ }
+ };
+
+ SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ public synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ public long getMaxIdleTime() {
+ return maxIdle;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ */
+
+ public synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ public synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while (wait && session.reserved) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ public synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ notifyAll();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ public synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ public synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ public Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ public Session removeSession(long sessionId, boolean unreserve) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ if (unreserve && session != null)
+ unreserveSession(session);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
+ private void sweep(long maxIdle) {
+ List<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+ if (idleTime > maxIdle && !session.reserved) {
+ log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms");
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
+ // do clean up outside of lock
+ for (Session session : sessionsToCleanup) {
+ session.cleanup();
+ }
+ }
+
+ public synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+ log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + "ms");
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance(aconf).schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
+ List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+ long ct = System.currentTimeMillis();
+
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
- activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
- state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
++ ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
++ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB());
++
++ // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
++ activeScan.setScanId(entry.getKey());
++ activeScans.add(activeScan);
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 0000000,178cb30..fe2e8cb
mode 000000,100644..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@@ -1,0 -1,360 +1,360 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.accumulo.test.functional;
+
++import java.util.EnumSet;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Random;
++import java.util.Set;
++import java.util.SortedSet;
++import java.util.TreeSet;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
+ import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.MutationsRejectedException;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.admin.ActiveScan;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.iterators.IteratorUtil;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.security.ColumnVisibility;
+ import org.apache.accumulo.core.util.UtilWaitThread;
+ import org.apache.accumulo.harness.AccumuloClusterIT;
+ import org.apache.hadoop.io.Text;
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+ import static com.google.common.base.Charsets.UTF_8;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+
+ /**
+ * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
+ * returns a unique scan id.<p>
+ * <p/>
+ * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions.
+ * The test exercises multiple tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers
+ * for completeness.
+ * <p/>
+ * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added:
+ * <p/>
+ * private static final long serialVersionUID = -4659975753252858243l;
+ * <p/>
+ * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
+ */
+ public class ScanIdIT extends AccumuloClusterIT {
+
+ private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
+
+ private static final int NUM_SCANNERS = 8;
+
+ private static final int NUM_DATA_ROWS = 100;
+
+ private static final Random random = new Random();
+
+ private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
+
+ private static volatile boolean testInProgress = true;
+
+ private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ /**
+ * @throws Exception any exception is a test failure.
+ */
+ @Test
+ public void testScanId() throws Exception {
+
+ final String tableName = getUniqueNames(1)[0];
+ Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+
+ addSplits(conn, tableName);
+
+ generateSampleData(conn, tableName);
+
+ attachSlowIterator(conn, tableName);
+
+ for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+ ScannerThread st = new ScannerThread(conn, scannerIndex, tableName);
+ pool.submit(st);
+ }
+
+ // wait for scanners to report a result.
+ while (testInProgress) {
+
+ if (resultsByWorker.size() < NUM_SCANNERS) {
+ log.trace("Results reported {}", resultsByWorker.size());
+ UtilWaitThread.sleep(750);
+ } else {
+ // each worker has reported at least one result.
+ testInProgress = false;
+
+ log.debug("Final result count {}", resultsByWorker.size());
+
+ // delay to allow scanners to react to end of test and cleanly close.
+ UtilWaitThread.sleep(1000);
+ }
+
+ }
+
+ // all scanner have reported at least 1 result, so check for unique scan ids.
+ Set<Long> scanIds = new HashSet<Long>();
+
+ List<String> tservers = conn.instanceOperations().getTabletServers();
+
+ log.debug("tablet servers {}", tservers.toString());
+
+ for (String tserver : tservers) {
+
+ List<ActiveScan> activeScans = conn.instanceOperations().getActiveScans(tserver);
+
+ log.debug("TServer {} has {} active scans", tserver, activeScans.size());
+
+ for (ActiveScan scan : activeScans) {
+ log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
+ scanIds.add(scan.getScanid());
+ }
+ }
+
+ assertTrue(NUM_SCANNERS <= scanIds.size());
+
+ }
+
+ /**
+ * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
+ * <p/>
+ * The thread run method is terminated when the testInProgress flag is set to false.
+ */
+ private static class ScannerThread implements Runnable {
+
+ private final Connector connector;
+ private Scanner scanner = null;
+ private final int workerIndex;
+ private final String tablename;
+
+ public ScannerThread(final Connector connector, final int workerIndex, final String tablename) {
+
+ this.connector = connector;
+ this.workerIndex = workerIndex;
+ this.tablename = tablename;
+
+ }
+
+ /**
+ * execute the scan across the sample data and put scan result into result map until
+ * testInProgress flag is set to false.
+ */
+ @Override public void run() {
+
+ /*
+ * set random initial delay of up to to
+ * allow scanners to proceed to different points.
+ */
+
+ long delay = random.nextInt(5000);
+
+ log.trace("Start delay for worker thread {} is {}", workerIndex, delay);
+
+ UtilWaitThread.sleep(delay);
+
+ try {
+
+ scanner = connector.createScanner(tablename, new Authorizations());
+
+ // Never start readahead
+ scanner.setReadaheadThreshold(Long.MAX_VALUE);
+ scanner.setBatchSize(1);
+
+ // create different ranges to try to hit more than one tablet.
+ scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
+
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException("Initialization failure. Could not create scanner", e);
+ }
+
+ scanner.fetchColumnFamily(new Text("fam1"));
+
+ for (Map.Entry<Key,Value> entry : scanner) {
+
+ // exit when success condition is met.
+ if (!testInProgress) {
+ scanner.clearScanIterators();
+ scanner.close();
+
+ return;
+ }
+
+ Text row = entry.getKey().getRow();
+
+ log.trace("worker {}, row {}", workerIndex, row.toString());
+
+ if (entry.getValue() != null) {
+
+ Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
+
+ // value should always being increasing
+ if (prevValue != null) {
+
+ log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue()));
+
+ assertTrue(prevValue.compareTo(entry.getValue()) > 0);
+ }
+ } else {
+ log.info("Scanner returned null");
+ fail("Scanner returned unexpected null value");
+ }
+
+ }
+
+ log.debug("Scanner ran out of data. (info only, not an error) ");
+
+ }
+ }
+
+ /**
+ * Create splits on table and force migration by taking table offline and then bring back
+ * online for test.
+ *
+ * @param conn Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void addSplits(final Connector conn, final String tableName) {
+
+ SortedSet<Text> splits = createSplits();
+
+ try {
+
+ conn.tableOperations().addSplits(tableName, splits);
+
+ conn.tableOperations().offline(tableName, true);
+
+ UtilWaitThread.sleep(2000);
+ conn.tableOperations().online(tableName, true);
+
+ for (Text split : conn.tableOperations().listSplits(tableName)) {
+ log.trace("Split {}", split);
+ }
+
+ } catch (AccumuloSecurityException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ } catch (AccumuloException e) {
+ throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+ }
+
+ }
+
+ /**
+ * Create splits to distribute data across multiple tservers.
+ *
+ * @return splits in sorted set for addSplits.
+ */
+ private SortedSet<Text> createSplits() {
+
+ SortedSet<Text> splits = new TreeSet<Text>();
+
+ for (int split = 0; split < 10; split++) {
+ splits.add(new Text(Integer.toString(split)));
+ }
+
+ return splits;
+ }
+
+ /**
+ * Generate some sample data using random row id to distribute across splits.
+ * <p/>
+ * The primary goal is to determine that each scanner is assigned a unique scan id.
+ * This test does check that the count value for fam1 increases if a scanner reads multiple value, but this is
+ * secondary consideration for this test, that is included for completeness.
+ *
+ * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void generateSampleData(Connector connector, final String tablename) {
+
+ try {
+
+ BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
+
+ ColumnVisibility vis = new ColumnVisibility("public");
+
+ for (int i = 0; i < NUM_DATA_ROWS; i++) {
+
+ Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
+
+ Mutation m = new Mutation(rowId);
+ m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
+ m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8)));
+ m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8)));
+
+ log.trace("Added row {}", rowId);
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+ } catch (MutationsRejectedException ex) {
+ throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+ }
+ }
+
+ /**
+ * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a
+ * fairly large sleep and delay times because we are not concerned with how much data is read and we do not read
+ * all of the data - the test stops once each scanner reports a scan id.
+ *
+ * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance.
+ */
+ private void attachSlowIterator(Connector connector, final String tablename) {
+ try {
+
+ IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
+ slowIter.addOption("sleepTime", "200");
+ slowIter.addOption("seekSleepTime", "200");
+
+ connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
+
+ } catch (AccumuloException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ } catch (AccumuloSecurityException ex) {
+ throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+ }
+ }
+
+ }