You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/03/07 00:47:28 UTC

svn commit: r1297799 - in /incubator/accumulo/trunk: ./ docs/examples/ src/core/src/main/java/org/apache/accumulo/core/client/ src/core/src/main/java/org/apache/accumulo/core/client/admin/ src/core/src/main/java/org/apache/accumulo/core/client/impl/ sr...

Author: kturner
Date: Tue Mar  6 23:47:28 2012
New Revision: 1297799

URL: http://svn.apache.org/viewvc?rev=1297799&view=rev
Log:
ACCUMULO-387 ACCUMULO-175 (merged from 1.4)
 * initial checkin of map reduce directly over files
 * stopped using AccumuloClassLoader for client side code that loads iterators.

Added:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
      - copied unchanged from r1297794, incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
      - copied unchanged from r1297794, incubator/accumulo/branches/1.4/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/docs/examples/README.mapred
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
    incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example
    incubator/accumulo/trunk/test/system/continuous/run-verify.sh

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  6 23:47:28 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1297762
+/incubator/accumulo/branches/1.4:1201902-1297794

Modified: incubator/accumulo/trunk/docs/examples/README.mapred
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/docs/examples/README.mapred?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/docs/examples/README.mapred (original)
+++ incubator/accumulo/trunk/docs/examples/README.mapred Tue Mar  6 23:47:28 2012
@@ -89,3 +89,9 @@ counts.
     tserver, count:20080906 []    1
     tserver.compaction.major.concurrent.max count:20080906 []    1
     ...
+
+Another example to look at is
+org.apache.accumulo.examples.simple.mapreduce.UniqueColumns.  This example
+computes the unique set of columns in a table and shows how a map reduce job
+can directly read a tables files from HDFS. 
+

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java Tue Mar  6 23:47:28 2012
@@ -188,7 +188,7 @@ public class ClientSideIteratorScanner e
         
         @Override
         public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {}
-      });
+      }, false);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Tue Mar  6 23:47:28 2012
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
@@ -847,11 +848,18 @@ public class TableOperationsImpl extends
       return Collections.singleton(range);
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(Tables.getTableId(instance, tableName)));
+    String tableId = Tables.getTableId(instance, tableName);
+    TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(tableId));
     while (!tl.binRanges(Collections.singletonList(range), binnedRanges).isEmpty()) {
+      if (!Tables.exists(instance, tableId))
+        throw new TableDeletedException(tableId);
+      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(instance, tableId);
+
       log.warn("Unable to locate bins for specified range. Retrying.");
       // sleep randomly between 100 and 200ms
       UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
+      binnedRanges.clear();
     }
     
     // group key extents to get <= maxSplits

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Tue Mar  6 23:47:28 2012
@@ -48,9 +48,13 @@ import org.apache.accumulo.core.client.C
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.OfflineScanner;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -63,10 +67,12 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -132,6 +138,8 @@ public abstract class InputFormatBase<K,
   private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options";
   private static final String ITERATORS_DELIM = ",";
   
+  private static final String READ_OFFLINE = PREFIX + ".read.offline";
+
   /**
    * Enable or disable use of the {@link IsolatedScanner} in this configuration object. By default it is not enabled.
    * 
@@ -269,6 +277,32 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * Enable reading offline tables. This will make the map reduce job directly read the tables files. If the table is not offline, then the job will fail. If
+   * the table comes online during the map reduce job, its likely that the job will fail.
+   * 
+   * To use this option, the map reduce user will need access to read the accumulo directory in HDFS.
+   * 
+   * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+   * on the mappers classpath. The accumulo-site.xml may need to be on the mappers classpath if HDFS or the accumlo directory in HDFS are non-standard.
+   * 
+   * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table to one file, and its faster to read from one file.
+   * 
+   * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+   * 
+   * @param conf
+   *          the job
+   * @param scanOff
+   *          pass true to read offline tables
+   */
+  
+  public static void setScanOffline(Configuration conf, boolean scanOff) {
+    conf.setBoolean(READ_OFFLINE, scanOff);
+  }
+  
+  /**
    * Restricts the columns that will be mapped over for this configuration object.
    * 
    * @param conf
@@ -568,6 +602,12 @@ public abstract class InputFormatBase<K,
     return conf.getInt(MAX_VERSIONS, -1);
   }
   
+  protected static boolean isOfflineScan(Configuration conf) {
+    return conf.getBoolean(READ_OFFLINE, false);
+  }
+
+  // Return a list of the iterator settings (for iterators to apply to a scanner)
+
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
    * 
@@ -684,7 +724,12 @@ public abstract class InputFormatBase<K,
         Connector conn = instance.getConnector(user, password);
         log.debug("Creating scanner for table: " + getTablename(attempt.getConfiguration()));
         log.debug("Authorizations are: " + authorizations);
-        scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+        if (isOfflineScan(attempt.getConfiguration())) {
+          scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password), instance.getInstanceID()), Tables.getTableId(instance,
+              getTablename(attempt.getConfiguration())), authorizations);
+        } else {
+          scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+        }
         if (isIsolated(attempt.getConfiguration())) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
@@ -742,6 +787,106 @@ public abstract class InputFormatBase<K,
     }
   }
   
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, String tableName, List<Range> ranges) throws TableNotFoundException,
+      AccumuloException, AccumuloSecurityException {
+    
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+
+    Instance instance = getInstance(job.getConfiguration());
+    Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration()));
+    String tableId = Tables.getTableId(instance, tableName);
+    
+    if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+      Tables.clearCache(instance);
+      if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+        throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
+      }
+    }
+
+    for (Range range : ranges) {
+      Text startRow;
+      
+      if (range.getStartKey() != null)
+        startRow = range.getStartKey().getRow();
+      else
+        startRow = new Text();
+      
+      Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+      scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
+      scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+      scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
+      scanner.setRange(metadataRange);
+      
+      RowIterator rowIter = new RowIterator(scanner);
+      
+      // TODO check that extents match prev extent
+
+      KeyExtent lastExtent = null;
+
+      while (rowIter.hasNext()) {
+        Iterator<Entry<Key,Value>> row = rowIter.next();
+        String last = "";
+        KeyExtent extent = null;
+        String location = null;
+        
+        while (row.hasNext()) {
+          Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+          
+          if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
+            last = entry.getValue().toString();
+          }
+          
+          if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
+              || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
+            location = entry.getValue().toString();
+          }
+
+          if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+            extent = new KeyExtent(key.getRow(), entry.getValue());
+          }
+          
+        }
+        
+        if (location != null)
+          return null;
+
+        if (!extent.getTableId().toString().equals(tableId)) {
+          throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
+        }
+
+        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
+          throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
+        }
+
+        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
+        if (tabletRanges == null) {
+          tabletRanges = new HashMap<KeyExtent,List<Range>>();
+          binnedRanges.put(last, tabletRanges);
+        }
+        
+        List<Range> rangeList = tabletRanges.get(extent);
+        if (rangeList == null) {
+          rangeList = new ArrayList<Range>();
+          tabletRanges.put(extent, rangeList);
+        }
+        
+        rangeList.add(range);
+
+        if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
+          break;
+        }
+        
+        lastExtent = extent;
+      }
+
+    }
+    
+    return binnedRanges;
+  }
+
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
@@ -762,10 +907,30 @@ public abstract class InputFormatBase<K,
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      tl = getTabletLocator(job.getConfiguration());
-      while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
-        log.warn("Unable to locate bins for specified ranges. Retrying.");
-        UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+      if (isOfflineScan(job.getConfiguration())) {
+        binnedRanges = binOfflineTable(job, tableName, ranges);
+        while (binnedRanges == null) {
+          // Some tablets were still online, try again
+          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+          binnedRanges = binOfflineTable(job, tableName, ranges);
+        }
+      } else {
+        Instance instance = getInstance(job.getConfiguration());
+        String tableId = null;
+        tl = getTabletLocator(job.getConfiguration());
+        while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
+          if (!(instance instanceof MockInstance)) {
+            if (tableId == null)
+              tableId = Tables.getTableId(instance, tableName);
+            if (!Tables.exists(instance, tableId))
+              throw new TableDeletedException(tableId);
+            if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+              throw new TableOfflineException(instance, tableId);
+          }
+          binnedRanges.clear();
+          log.warn("Unable to locate bins for specified ranges. Retrying.");
+          UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
+        }
       }
     } catch (Exception e) {
       throw new IOException(e);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java Tue Mar  6 23:47:28 2012
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
@@ -106,7 +106,7 @@ public class MockScannerBase extends Sca
     AccumuloConfiguration conf = new MockConfiguration(table.settings);
     MockIteratorEnvironment iterEnv = new MockIteratorEnvironment();
     SortedKeyValueIterator<Key,Value> result = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, vf, null, conf,
-        serverSideIteratorList, serverSideIteratorOptions, iterEnv));
+        serverSideIteratorList, serverSideIteratorOptions, iterEnv, false));
     return result;
   }
   

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java Tue Mar  6 23:47:28 2012
@@ -185,6 +185,12 @@ public class IteratorUtil {
   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
       IteratorEnvironment env) throws IOException {
+    return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
+  }
+  
+  public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
+      SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
     List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
     Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
     
@@ -201,18 +207,23 @@ public class IteratorUtil {
       }
     }
     
-    return loadIterators(source, iters, allOptions, env);
+    return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader);
   }
   
+  @SuppressWarnings("unchecked")
   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
-      Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env) throws IOException {
+      Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
     SortedKeyValueIterator<K,V> prev = source;
     
     try {
       for (IterInfo iterInfo : iters) {
-        @SuppressWarnings("unchecked")
-        Class<? extends SortedKeyValueIterator<K,V>> clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className,
-            SortedKeyValueIterator.class);
+       
+        Class<? extends SortedKeyValueIterator<K,V>> clazz;
+        if (useAccumuloClassLoader){
+          clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class);
+        }else{
+          clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
+        }
         SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
         
         Map<String,String> options = iterOpts.get(iterInfo.iterName);

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java Tue Mar  6 23:47:28 2012
@@ -18,8 +18,12 @@ package org.apache.accumulo.server.test.
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
@@ -130,10 +134,10 @@ public class ContinuousVerify extends Co
   }
   
   @Override
-  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    if (args.length != 8) {
+  public int run(String[] args) throws Exception {
+    if (args.length != 9) {
       throw new IllegalArgumentException("Usage : " + ContinuousVerify.class.getName()
-          + " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers>");
+          + " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers> <scan offline>");
     }
     
     String instance = args[0];
@@ -144,14 +148,26 @@ public class ContinuousVerify extends Co
     String outputdir = args[5];
     String maxMaps = args[6];
     String reducers = args[7];
+    boolean scanOffline = Boolean.parseBoolean(args[8]);
     
     Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
     
+    String clone = table;
+    Connector conn = null;
+    if (scanOffline) {
+      Random random = new Random();
+      clone = table + "_" + String.format("%016x", Math.abs(random.nextLong()));
+      ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+      conn = zki.getConnector(user, pass.getBytes());
+      conn.tableOperations().clone(table, clone, true, new HashMap<String,String>(), new HashSet<String>());
+      conn.tableOperations().offline(clone);
+    }
+
     job.setInputFormatClass(AccumuloInputFormat.class);
-    AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), table, new Authorizations());
+    AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, new Authorizations());
     AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
-    
+    AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
     // set up ranges
     try {
       Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
@@ -170,9 +186,17 @@ public class ContinuousVerify extends Co
     job.setNumReduceTasks(Integer.parseInt(reducers));
     
     job.setOutputFormatClass(TextOutputFormat.class);
+    
+    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
+
     TextOutputFormat.setOutputPath(job, new Path(outputdir));
     
     job.waitForCompletion(true);
+    
+    if (scanOffline) {
+      conn.tableOperations().delete(clone);
+    }
+
     return job.isSuccessful() ? 0 : 1;
   }
   

Modified: incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example (original)
+++ incubator/accumulo/trunk/test/system/continuous/continuous-env.sh.example Tue Mar  6 23:47:28 2012
@@ -77,6 +77,7 @@ MASTER_RESTART_SLEEP_TIME=2
 VERFIY_OUT=/tmp/continuous_verify
 VERIFY_MAX_MAPS=64
 VERIFY_REDUCERS=64
+SCAN_OFFLINE=false
 
 #settings related to the batch walker
 BATCH_WALKER_SLEEP=180000

Modified: incubator/accumulo/trunk/test/system/continuous/run-verify.sh
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/continuous/run-verify.sh?rev=1297799&r1=1297798&r2=1297799&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/continuous/run-verify.sh (original)
+++ incubator/accumulo/trunk/test/system/continuous/run-verify.sh Tue Mar  6 23:47:28 2012
@@ -18,5 +18,5 @@
 
 . mapred-setup.sh
 
-$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS $VERIFY_REDUCERS
+$ACCUMULO_HOME/bin/tool.sh "$SERVER_LIBJAR" org.apache.accumulo.server.test.continuous.ContinuousVerify -libjars "$SERVER_LIBJAR" $INSTANCE_NAME $ZOO_KEEPERS $USER $PASS $TABLE $VERFIY_OUT $VERIFY_MAX_MAPS $VERIFY_REDUCERS $SCAN_OFFLINE