You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/09/22 00:16:28 UTC
svn commit: r1388707 - in
/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands:
DeleteCommand.java DeleteManyCommand.java GrepCommand.java
InsertCommand.java ScanCommand.java
Author: kturner
Date: Fri Sep 21 22:16:28 2012
New Revision: 1388707
URL: http://svn.apache.org/viewvc?rev=1388707&view=rev
Log:
ACCUMULO-705 ACCUMULO-706 Added timeout option to shell commands that read and write data
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java Fri Sep 21 22:16:28 2012
@@ -17,12 +17,14 @@
package org.apache.accumulo.core.util.shell.commands;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
@@ -35,6 +37,15 @@ import org.apache.hadoop.io.Text;
public class DeleteCommand extends Command {
private Option deleteOptAuths, timestampOpt;
+ private Option timeoutOption;
+
+ protected long getTimeout(final CommandLine cl) {
+ if (cl.hasOption(timeoutOption.getLongOpt())) {
+ return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+ }
+
+ return Long.MAX_VALUE;
+ }
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
IOException, ConstraintViolationException {
@@ -57,7 +68,7 @@ public class DeleteCommand extends Comma
m.putDelete(colf, colq);
}
final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(),
- new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1));
+ new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
bw.addMutation(m);
bw.close();
return 0;
@@ -85,6 +96,11 @@ public class DeleteCommand extends Comma
timestampOpt.setArgName("timestamp");
o.addOption(timestampOpt);
+ timeoutOption = new Option(null, "timeout", true,
+ "time before insert should fail if no data is written. If no unit is given assumes seconds. Units d,h,m,s,and ms are supported. e.g. 30s or 100ms");
+ timeoutOption.setArgName("timeout");
+ o.addOption(timeoutOption);
+
return o;
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java Fri Sep 21 22:16:28 2012
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.core.util.shell.commands;
+import java.util.concurrent.TimeUnit;
+
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -47,11 +49,14 @@ public class DeleteManyCommand extends S
// handle remaining optional arguments
scanner.setRange(getRange(cl, interpeter));
+ scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
// handle columns
fetchColumns(cl, scanner, interpeter);
// output / delete the records
- final BatchWriter writer = shellState.getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+ final BatchWriter writer = shellState.getConnector()
+ .createBatchWriter(tableName, new BatchWriterConfig().setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
shellState.printLines(new DeleterFormatter(writer, scanner, cl.hasOption(timestampOpt.getOpt()), shellState, cl.hasOption(forceOpt.getOpt())), false);
return 0;
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/GrepCommand.java Fri Sep 21 22:16:28 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.util.sh
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -55,6 +56,8 @@ public class GrepCommand extends ScanCom
final BatchScanner scanner = shellState.getConnector().createBatchScanner(tableName, auths, numThreads);
scanner.setRanges(Collections.singletonList(getRange(cl, interpeter)));
+ scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
for (int i = 0; i < cl.getArgs().length; i++) {
setUpIterator(Integer.MAX_VALUE - cl.getArgs().length + i, "grep" + i, cl.getArgs()[i], scanner);
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java Fri Sep 21 22:16:28 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.util.sh
import java.io.IOException;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -25,6 +26,7 @@ import org.apache.accumulo.core.client.B
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
@@ -40,7 +42,16 @@ import org.apache.hadoop.io.Text;
public class InsertCommand extends Command {
private Option insertOptAuths, timestampOpt;
+ private Option timeoutOption;
+ protected long getTimeout(final CommandLine cl) {
+ if (cl.hasOption(timeoutOption.getLongOpt())) {
+ return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+ }
+
+ return Long.MAX_VALUE;
+ }
+
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
IOException, ConstraintViolationException {
shellState.checkTableState();
@@ -64,7 +75,7 @@ public class InsertCommand extends Comma
m.put(colf, colq, val);
final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(),
- new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1));
+ new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
bw.addMutation(m);
try {
bw.close();
@@ -108,6 +119,11 @@ public class InsertCommand extends Comma
timestampOpt.setArgName("timestamp");
o.addOption(timestampOpt);
+ timeoutOption = new Option(null, "timeout", true,
+ "time before insert should fail if no data is written. If no unit is given assumes seconds. Units d,h,m,s,and ms are supported. e.g. 30s or 100ms");
+ timeoutOption.setArgName("timeout");
+ o.addOption(timeoutOption);
+
return o;
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java?rev=1388707&r1=1388706&r2=1388707&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java Fri Sep 21 22:16:28 2012
@@ -20,12 +20,14 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -48,6 +50,7 @@ public class ScanCommand extends Command
protected Option timestampOpt;
private Option optStartRowExclusive;
private Option optEndRowExclusive;
+ private Option timeoutOption;
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception {
final String tableName = OptUtil.getTableOpt(cl, shellState);
@@ -69,6 +72,9 @@ public class ScanCommand extends Command
// handle columns
fetchColumns(cl, scanner, interpeter);
+ // set timeout
+ scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+
// output the records
if (cl.hasOption(showFewOpt.getOpt())) {
final String showLength = cl.getOptionValue(showFewOpt.getOpt());
@@ -92,6 +98,14 @@ public class ScanCommand extends Command
return 0;
}
+ protected long getTimeout(final CommandLine cl) {
+ if (cl.hasOption(timeoutOption.getLongOpt())) {
+ return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
+ }
+
+ return Long.MAX_VALUE;
+ }
+
protected void addScanIterators(final Shell shellState, final Scanner scanner, final String tableName) {
final List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName());
if (tableScanIterators == null) {
@@ -223,6 +237,8 @@ public class ScanCommand extends Command
formatterOpt = new Option("fm", "formatter", true, "fully qualified name of the formatter class to use");
interpreterOpt = new Option("i", "interpreter", true, "fully qualified name of the interpreter class to use");
formatterInterpeterOpt = new Option("fi", "fmt-interpreter", true, "fully qualified name of a class that is a formatter and interpreter");
+ timeoutOption = new Option(null, "timeout", true,
+ "time before scan should fail if no data is returned. If no unit is given assumes seconds. Units d,h,m,s,and ms are supported. e.g. 30s or 100ms");
scanOptAuths.setArgName("comma-separated-authorizations");
scanOptRow.setArgName("row");
@@ -230,6 +246,7 @@ public class ScanCommand extends Command
showFewOpt.setRequired(false);
showFewOpt.setArgName("int");
formatterOpt.setArgName("className");
+ timeoutOption.setArgName("timeout");
o.addOption(scanOptAuths);
o.addOption(scanOptRow);
@@ -245,6 +262,7 @@ public class ScanCommand extends Command
o.addOption(formatterOpt);
o.addOption(interpreterOpt);
o.addOption(formatterInterpeterOpt);
+ o.addOption(timeoutOption);
return o;
}