You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/22 00:16:28 UTC

svn commit: r1388707 - in /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands: DeleteCommand.java DeleteManyCommand.java GrepCommand.java InsertCommand.java ScanCommand.java

Author: kturner
Date: Fri Sep 21 22:16:28 2012
New Revision: 1388707

URL: http://svn.apache.org/viewvc?rev=1388707&view=rev
Log:
ACCUMULO-705 ACCUMULO-706 Added timeout option to shell commands that read and write data

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java Fri Sep 21 22:16:28 2012
@@ -17,12 +17,14 @@
 package org.apache.accumulo.core.util.shell.commands;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
@@ -35,6 +37,15 @@ import org.apache.hadoop.io.Text;
 
 public class DeleteCommand extends Command {
   private Option deleteOptAuths, timestampOpt;
+  private Option timeoutOption;
+  
+  protected long getTimeout(final CommandLine cl) {
+    if (cl.hasOption(timeoutOption.getLongOpt())) {
+      return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+    }
+    
+    return Long.MAX_VALUE;
+  }
   
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
       IOException, ConstraintViolationException {
@@ -57,7 +68,7 @@ public class DeleteCommand extends Comma
       m.putDelete(colf, colq);
     }
     final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(),
-        new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1));
+        new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
     bw.addMutation(m);
     bw.close();
     return 0;
@@ -85,6 +96,11 @@ public class DeleteCommand extends Comma
     timestampOpt.setArgName("timestamp");
     o.addOption(timestampOpt);
     
+    timeoutOption = new Option(null, "timeout", true,
+        "time before insert should fail if no data is written. If no unit is given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 100ms");
+    timeoutOption.setArgName("timeout");
+    o.addOption(timeoutOption);
+
     return o;
   }
   

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java Fri Sep 21 22:16:28 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.util.shell.commands;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -47,11 +49,14 @@ public class DeleteManyCommand extends S
     // handle remaining optional arguments
     scanner.setRange(getRange(cl, interpeter));
     
+    scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
     // handle columns
     fetchColumns(cl, scanner, interpeter);
     
     // output / delete the records
-    final BatchWriter writer = shellState.getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+    final BatchWriter writer = shellState.getConnector()
+        .createBatchWriter(tableName, new BatchWriterConfig().setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
     shellState.printLines(new DeleterFormatter(writer, scanner, cl.hasOption(timestampOpt.getOpt()), shellState, cl.hasOption(forceOpt.getOpt())), false);
     
     return 0;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java Fri Sep 21 22:16:28 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.util.sh
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -55,6 +56,8 @@ public class GrepCommand extends ScanCom
     final BatchScanner scanner = shellState.getConnector().createBatchScanner(tableName, auths, numThreads);
     scanner.setRanges(Collections.singletonList(getRange(cl, interpeter)));
     
+    scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
     for (int i = 0; i < cl.getArgs().length; i++) {
       setUpIterator(Integer.MAX_VALUE - cl.getArgs().length + i, "grep" + i, cl.getArgs()[i], scanner);
     }    

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java Fri Sep 21 22:16:28 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.util.sh
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -25,6 +26,7 @@ import org.apache.accumulo.core.client.B
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.ConstraintViolationSummary;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
@@ -40,7 +42,16 @@ import org.apache.hadoop.io.Text;
 
 public class InsertCommand extends Command {
   private Option insertOptAuths, timestampOpt;
+  private Option timeoutOption;
   
+  protected long getTimeout(final CommandLine cl) {
+    if (cl.hasOption(timeoutOption.getLongOpt())) {
+      return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+    }
+    
+    return Long.MAX_VALUE;
+  }
+
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
       IOException, ConstraintViolationException {
     shellState.checkTableState();
@@ -64,7 +75,7 @@ public class InsertCommand extends Comma
       m.put(colf, colq, val);
     
     final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(),
-        new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1));
+        new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
     bw.addMutation(m);
     try {
       bw.close();
@@ -108,6 +119,11 @@ public class InsertCommand extends Comma
     timestampOpt.setArgName("timestamp");
     o.addOption(timestampOpt);
     
+    timeoutOption = new Option(null, "timeout", true,
+        "time before insert should fail if no data is written. If no unit is given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 100ms");
+    timeoutOption.setArgName("timeout");
+    o.addOption(timeoutOption);
+
     return o;
   }
   

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java Fri Sep 21 22:16:28 2012
@@ -20,12 +20,14 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -48,6 +50,7 @@ public class ScanCommand extends Command
   protected Option timestampOpt;
   private Option optStartRowExclusive;
   private Option optEndRowExclusive;
+  private Option timeoutOption;
   
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception {
     final String tableName = OptUtil.getTableOpt(cl, shellState);
@@ -69,6 +72,9 @@ public class ScanCommand extends Command
     // handle columns
     fetchColumns(cl, scanner, interpeter);
     
+    // set timeout
+    scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
     // output the records
     if (cl.hasOption(showFewOpt.getOpt())) {
       final String showLength = cl.getOptionValue(showFewOpt.getOpt());
@@ -92,6 +98,14 @@ public class ScanCommand extends Command
     return 0;
   }
   
+  protected long getTimeout(final CommandLine cl) {
+    if (cl.hasOption(timeoutOption.getLongOpt())) {
+      return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+    }
+    
+    return Long.MAX_VALUE;
+  }
+
   protected void addScanIterators(final Shell shellState, final Scanner scanner, final String tableName) {
     final List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName());
     if (tableScanIterators == null) {
@@ -223,6 +237,8 @@ public class ScanCommand extends Command
     formatterOpt = new Option("fm", "formatter", true, "fully qualified name of the formatter class to use");
     interpreterOpt = new Option("i", "interpreter", true, "fully qualified name of the interpreter class to use");
     formatterInterpeterOpt = new Option("fi", "fmt-interpreter", true, "fully qualified name of a class that is a formatter and interpreter");
+    timeoutOption = new Option(null, "timeout", true,
+        "time before scan should fail if no data is returned. If no unit is given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 100ms");
     
     scanOptAuths.setArgName("comma-separated-authorizations");
     scanOptRow.setArgName("row");
@@ -230,6 +246,7 @@ public class ScanCommand extends Command
     showFewOpt.setRequired(false);
     showFewOpt.setArgName("int");
     formatterOpt.setArgName("className");
+    timeoutOption.setArgName("timeout");
     
     o.addOption(scanOptAuths);
     o.addOption(scanOptRow);
@@ -245,6 +262,7 @@ public class ScanCommand extends Command
     o.addOption(formatterOpt);
     o.addOption(interpreterOpt);
     o.addOption(formatterInterpeterOpt);
+    o.addOption(timeoutOption);
     
     return o;
   }