You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/03/07 00:47:28 UTC
svn commit: r1297799 - in /incubator/accumulo/trunk: ./ docs/examples/
src/core/src/main/java/org/apache/accumulo/core/client/
src/core/src/main/java/org/apache/accumulo/core/client/admin/
src/core/src/main/java/org/apache/accumulo/core/client/impl/ sr...
Author: kturner
Date: Tue Mar 6 23:47:28 2012
New Revision: 1297799
URL: http://svn.apache.org/viewvc?rev=1297799&view=rev
Log:
ACCUMULO-387 ACCUMULO-175 (merged from 1.4)
* initial checkin of map reduce directly over files
* stopped using AccumuloClassLoader for client side code that loads iterators.
Added:
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
- copied unchanged from r1297794, incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
- copied unchanged from r1297794, incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
Modified:
incubator/accumulo/trunk/ (props changed)
incubator/accumulo/trunk/docs/examples/README.mapred
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example
incubator/accumulo/trunk/test/system/continuous/run-verify.sh
Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 6 23:47:28 2012
@@ -1,3 +1,3 @@
/incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
/incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1297762
+/incubator/accumulo/branches/1.4:1201902-1297794
Modified: incubator/accumulo/trunk/docs/examples/README.mapred
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/docs/examples/README.mapred?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/docs/examples/README.mapred (original)
+++ incubator/accumulo/trunk/docs/examples/README.mapred Tue Mar 6 23:47:28 2012
@@ -89,3 +89,9 @@ counts.
tserver, count:20080906 [] 1
tserver.compaction.major.concurrent.max count:20080906 [] 1
...
+
+Another example to look at is
+org.apache.accumulo.examples.simple.mapreduce.UniqueColumns. This example
+computes the unique set of columns in a table and shows how a map reduce job
+can directly read a tables files from HDFS.
+
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java Tue Mar 6 23:47:28 2012
@@ -188,7 +188,7 @@ public class ClientSideIteratorScanner e
@Override
public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
- });
+ }, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Tue Mar 6 23:47:28 2012
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.A
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
@@ -847,11 +848,18 @@ public class TableOperationsImpl extends
return Collections.singleton(range);
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
- TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(Tables.getTableId(instance, tableName)));
+ String tableId = Tables.getTableId(instance, tableName);
+ TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(tableId));
while (!tl.binRanges(Collections.singletonList(range), binnedRanges).isEmpty()) {
+ if (!Tables.exists(instance, tableId))
+ throw new TableDeletedException(tableId);
+ if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(instance, tableId);
+
log.warn("Unable to locate bins for specified range. Retrying.");
// sleep randomly between 100 and 200ms
UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
+ binnedRanges.clear();
}
// group key extents to get <= maxSplits
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Tue Mar 6 23:47:28 2012
@@ -48,9 +48,13 @@ import org.apache.accumulo.core.client.C
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.OfflineScanner;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mock.MockInstance;
@@ -63,10 +67,12 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -132,6 +138,8 @@ public abstract class InputFormatBase<K,
private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options";
private static final String ITERATORS_DELIM = ",";
+ private static final String READ_OFFLINE = PREFIX + ".read.offline";
+
/**
* Enable or disable use of the {@link IsolatedScanner} in this configuration object. By default it is not enabled.
*
@@ -269,6 +277,32 @@ public abstract class InputFormatBase<K,
}
/**
+ * Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If
+ * the table comes online during the map reduce job, its likely that the job will fail.
+ *
+ * To use this option, the map reduce user will need access to read the accumulo directory in HDFS.
+ *
+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+ * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath if HDFS or the accumlo directory in HDFS are non-standard.
+ *
+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+ * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+ * reason to do this is that compaction will reduce each tablet in the table to one file, and its faster to read from one file.
+ *
+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+ * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+ *
+ * @param conf
+ * the job
+ * @param scanOff
+ * pass true to read offline tables
+ */
+
+ public static void setScanOffline(Configuration conf, boolean scanOff) {
+ conf.setBoolean(READ_OFFLINE, scanOff);
+ }
+
+ /**
* Restricts the columns that will be mapped over for this configuration object.
*
* @param conf
@@ -568,6 +602,12 @@ public abstract class InputFormatBase<K,
return conf.getInt(MAX_VERSIONS, -1);
}
+ protected static boolean isOfflineScan(Configuration conf) {
+ return conf.getBoolean(READ_OFFLINE, false);
+ }
+
+ // Return a list of the iterator settings (for iterators to apply to a scanner)
+
/**
* Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
*
@@ -684,7 +724,12 @@ public abstract class InputFormatBase<K,
Connector conn = instance.getConnector(user, password);
log.debug("Creating scanner for table: " + getTablename(attempt.getConfiguration()));
log.debug("Authorizations are: " + authorizations);
- scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+ if (isOfflineScan(attempt.getConfiguration())) {
+ scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance,
+ getTablename(attempt.getConfiguration())), authorizations);
+ } else {
+ scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+ }
if (isIsolated(attempt.getConfiguration())) {
log.info("Creating isolated scanner");
scanner = new IsolatedScanner(scanner);
@@ -742,6 +787,106 @@ public abstract class InputFormatBase<K,
}
}
+ Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, String tableName, List<Range> ranges) throws TableNotFoundException,
+ AccumuloException, AccumuloSecurityException {
+
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+
+ Instance instance = getInstance(job.getConfiguration());
+ Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration()));
+ String tableId = Tables.getTableId(instance, tableName);
+
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ Tables.clearCache(instance);
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
+ }
+ }
+
+ for (Range range : ranges) {
+ Text startRow;
+
+ if (range.getStartKey() != null)
+ startRow = range.getStartKey().getRow();
+ else
+ startRow = new Text();
+
+ Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+ Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+ scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
+ scanner.setRange(metadataRange);
+
+ RowIterator rowIter = new RowIterator(scanner);
+
+ // TODO check that extents match prev extent
+
+ KeyExtent lastExtent = null;
+
+ while (rowIter.hasNext()) {
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+ String last = "";
+ KeyExtent extent = null;
+ String location = null;
+
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
+ last = entry.getValue().toString();
+ }
+
+ if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
+ || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
+ location = entry.getValue().toString();
+ }
+
+ if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+ extent = new KeyExtent(key.getRow(), entry.getValue());
+ }
+
+ }
+
+ if (location != null)
+ return null;
+
+ if (!extent.getTableId().toString().equals(tableId)) {
+ throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+ }
+
+ if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+ throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+ }
+
+ Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+ if (tabletRanges == null) {
+ tabletRanges = new HashMap<KeyExtent,List<Range>>();
+ binnedRanges.put(last, tabletRanges);
+ }
+
+ List<Range> rangeList = tabletRanges.get(extent);
+ if (rangeList == null) {
+ rangeList = new ArrayList<Range>();
+ tabletRanges.put(extent, rangeList);
+ }
+
+ rangeList.add(range);
+
+ if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
+ break;
+ }
+
+ lastExtent = extent;
+ }
+
+ }
+
+ return binnedRanges;
+ }
+
/**
* Read the metadata table to get tablets and match up ranges to them.
*/
@@ -762,10 +907,30 @@ public abstract class InputFormatBase<K,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
TabletLocator tl;
try {
- tl = getTabletLocator(job.getConfiguration());
- while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
- log.warn("Unable to locate bins for specified ranges. Retrying.");
- UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+ if (isOfflineScan(job.getConfiguration())) {
+ binnedRanges = binOfflineTable(job, tableName, ranges);
+ while (binnedRanges == null) {
+ // Some tablets were still online, try again
+ UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+ binnedRanges = binOfflineTable(job, tableName, ranges);
+ }
+ } else {
+ Instance instance = getInstance(job.getConfiguration());
+ String tableId = null;
+ tl = getTabletLocator(job.getConfiguration());
+ while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
+ if (!(instance instanceof MockInstance)) {
+ if (tableId == null)
+ tableId = Tables.getTableId(instance, tableName);
+ if (!Tables.exists(instance, tableId))
+ throw new TableDeletedException(tableId);
+ if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(instance, tableId);
+ }
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+ }
}
} catch (Exception e) {
throw new IOException(e);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java Tue Mar 6 23:47:28 2012
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
import org.apache.accumulo.core.iterators.system.DeletingIterator;
@@ -106,7 +106,7 @@ public class MockScannerBase extends Sca
AccumuloConfiguration conf = new MockConfiguration(table.settings);
MockIteratorEnvironment iterEnv = new MockIteratorEnvironment();
SortedKeyValueIterator<Key,Value> result = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, vf, null, conf,
- serverSideIteratorList, serverSideIteratorOptions, iterEnv));
+ serverSideIteratorList, serverSideIteratorOptions, iterEnv, false));
return result;
}
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java Tue Mar 6 23:47:28 2012
@@ -185,6 +185,12 @@ public class IteratorUtil {
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
IteratorEnvironment env) throws IOException {
+ return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
+ }
+
+ public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
+ SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+ IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
@@ -201,18 +207,23 @@ public class IteratorUtil {
}
}
- return loadIterators(source, iters, allOptions, env);
+ return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader);
}
+ @SuppressWarnings("unchecked")
public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
- Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env) throws IOException {
+ Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
SortedKeyValueIterator<K,V> prev = source;
try {
for (IterInfo iterInfo : iters) {
- @SuppressWarnings("unchecked")
- Class<? extends SortedKeyValueIterator<K,V>> clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className,
- SortedKeyValueIterator.class);
+
+ Class<? extends SortedKeyValueIterator<K,V>> clazz;
+ if (useAccumuloClassLoader){
+ clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class);
+ }else{
+ clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
+ }
SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
Map<String,String> options = iterOpts.get(iterInfo.iterName);
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java Tue Mar 6 23:47:28 2012
@@ -18,8 +18,12 @@ package org.apache.accumulo.server.test.
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
import java.util.Set;
+import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
@@ -130,10 +134,10 @@ public class ContinuousVerify extends Co
}
@Override
- public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- if (args.length != 8) {
+ public int run(String[] args) throws Exception {
+ if (args.length != 9) {
throw new IllegalArgumentException("Usage : " + ContinuousVerify.class.getName()
- + " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers>");
+ + " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers> <scan offline>");
}
String instance = args[0];
@@ -144,14 +148,26 @@ public class ContinuousVerify extends Co
String outputdir = args[5];
String maxMaps = args[6];
String reducers = args[7];
+ boolean scanOffline = Boolean.parseBoolean(args[8]);
Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
+ String clone = table;
+ Connector conn = null;
+ if (scanOffline) {
+ Random random = new Random();
+ clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+ conn = zki.getConnector(user, pass.getBytes());
+ conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+ conn.tableOperations().offline(clone);
+ }
+
job.setInputFormatClass(AccumuloInputFormat.class);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, new Authorizations());
+ AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, new Authorizations());
AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
-
+ AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
// set up ranges
try {
Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
@@ -170,9 +186,17 @@ public class ContinuousVerify extends Co
job.setNumReduceTasks(Integer.parseInt(reducers));
job.setOutputFormatClass(TextOutputFormat.class);
+
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
+
TextOutputFormat.setOutputPath(job, new Path(outputdir));
job.waitForCompletion(true);
+
+ if (scanOffline) {
+ conn.tableOperations().delete(clone);
+ }
+
return job.isSuccessful() ? 0 : 1;
}
Modified: incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example (original)
+++ incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example Tue Mar 6 23:47:28 2012
@@ -77,6 +77,7 @@ MASTER_RESTART_SLEEP_TIME=2
VERFIY_OUT=/tmp/continuous_verify
VERIFY_MAX_MAPS=64
VERIFY_REDUCERS=64
+SCAN_OFFLINE=false
#settings related to the batch walker
BATCH_WALKER_SLEEP=180000
Modified: incubator/accumulo/trunk/test/system/continuous/run-verify.sh
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/continuous/run-verify.sh?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/continuous/run-verify.sh (original)
+++ incubator/accumulo/trunk/test/system/continuous/run-verify.sh Tue Mar 6 23:47:28 2012
@@ -18,5 +18,5 @@
. mapred-setup.sh
-$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS $VERIFY_REDUCERS
+$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS $VERIFY_REDUCERS $SCAN_OFFLINE