You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/09/05 23:17:33 UTC
[17/18] git commit: ACCUMULO-1957 test/whitespace cleanup
ACCUMULO-1957 test/whitespace cleanup
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e0fe2ae6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e0fe2ae6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e0fe2ae6
Branch: refs/heads/master
Commit: e0fe2ae61d19108521047fb56dd11044ddbb3894
Parents: c56e300
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Sep 3 10:32:50 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Sep 5 17:16:59 2014 -0400
----------------------------------------------------------------------
.../accumulo/core/client/BatchWriterConfig.java | 32 ++++++------
.../core/client/ConditionalWriterConfig.java | 40 +++++++--------
.../apache/accumulo/core/client/Durability.java | 4 +-
.../apache/accumulo/tserver/TabletServer.java | 12 ++---
.../test/functional/SessionDurabilityIT.java | 53 +++++++++++++++++++-
5 files changed, 96 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0fe2ae6/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
index 1897552..270a89e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.util.StringUtils;
/**
* This object holds configuration settings used to instantiate a {@link BatchWriter}
- *
+ *
* @since 1.5.0
*/
public class BatchWriterConfig implements Writable {
@@ -46,16 +46,16 @@ public class BatchWriterConfig implements Writable {
private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
private Integer maxWriteThreads = null;
-
+
private Durability durability = Durability.DEFAULT;
/**
* Sets the maximum memory to batch before writing. The smaller this value, the more frequently the {@link BatchWriter} will write.<br />
* If set to a value smaller than a single mutation, then it will {@link BatchWriter#flush()} after each added mutation. Must be non-negative.
- *
+ *
* <p>
* <b>Default:</b> 50M
- *
+ *
* @param maxMemory
* max size in bytes
* @throws IllegalArgumentException
@@ -72,15 +72,15 @@ public class BatchWriterConfig implements Writable {
/**
* Sets the maximum amount of time to hold the data in memory before flushing it to servers.<br />
* For no maximum, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
- *
+ *
* <p>
* {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
* If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
* be used.
- *
+ *
* <p>
* <b>Default:</b> 120 seconds
- *
+ *
* @param maxLatency
* the maximum latency, in the unit specified by the value of {@code timeUnit}
* @param timeUnit
@@ -104,15 +104,15 @@ public class BatchWriterConfig implements Writable {
/**
* Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link BatchWriter} should throw an exception.<br />
* For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
- *
+ *
* <p>
* {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
* If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
* be used.
- *
+ *
* <p>
* <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
- *
+ *
* @param timeout
* the timeout, in the unit specified by the value of {@code timeUnit}
* @param timeUnit
@@ -135,10 +135,10 @@ public class BatchWriterConfig implements Writable {
/**
* Sets the maximum number of threads to use for writing data to the tablet servers.
- *
+ *
* <p>
* <b>Default:</b> 3
- *
+ *
* @param maxWriteThreads
* the maximum threads to use
* @throws IllegalArgumentException
@@ -168,7 +168,7 @@ public class BatchWriterConfig implements Writable {
public int getMaxWriteThreads() {
return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
}
-
+
/**
* @since 1.7.0
* @return the durability to be used by the BatchWriter
@@ -176,16 +176,16 @@ public class BatchWriterConfig implements Writable {
public Durability getDurability() {
return durability;
}
-
+
/**
* @param durability the Durability to be used by the BatchWriter
* @since 1.7.0
- *
+ *
*/
public void setDurability(Durability durability) {
this.durability = durability;
}
-
+
@Override
public void write(DataOutput out) throws IOException {
// write this out in a human-readable way
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0fe2ae6/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
index 7bf568f..1280abd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@ -23,26 +23,26 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.security.Authorizations;
/**
- *
+ *
* @since 1.6.0
*/
public class ConditionalWriterConfig {
-
+
private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
private Long timeout = null;
-
+
private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
private Integer maxWriteThreads = null;
-
+
private Authorizations auths = Authorizations.EMPTY;
-
+
private Durability durability = Durability.DEFAULT;
-
+
/**
* A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be
* a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an
* exception will be thrown.
- *
+ *
* <p>
* Any condition that is not visible with this set of authorizations will fail.
*/
@@ -51,20 +51,20 @@ public class ConditionalWriterConfig {
this.auths = auths;
return this;
}
-
+
/**
* Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the
* mutation with an exception.<br />
* For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
- *
+ *
* <p>
* {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
* If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
* be used.
- *
+ *
* <p>
* <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
- *
+ *
* @param timeout
* the timeout, in the unit specified by the value of {@code timeUnit}
* @param timeUnit
@@ -76,7 +76,7 @@ public class ConditionalWriterConfig {
public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
-
+
if (timeout == 0)
this.timeout = Long.MAX_VALUE;
else
@@ -84,13 +84,13 @@ public class ConditionalWriterConfig {
this.timeout = Math.max(1, timeUnit.toMillis(timeout));
return this;
}
-
+
/**
* Sets the maximum number of threads to use for writing data to the tablet servers.
- *
+ *
* <p>
* <b>Default:</b> 3
- *
+ *
* @param maxWriteThreads
* the maximum threads to use
* @throws IllegalArgumentException
@@ -100,11 +100,11 @@ public class ConditionalWriterConfig {
public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) {
if (maxWriteThreads <= 0)
throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
-
+
this.maxWriteThreads = maxWriteThreads;
return this;
}
-
+
/**
* Sets the Durability for the mutation, if applied.
* <p>
@@ -117,7 +117,7 @@ public class ConditionalWriterConfig {
this.durability = durability;
return this;
}
-
+
public Authorizations getAuthorizations() {
return auths;
}
@@ -125,11 +125,11 @@ public class ConditionalWriterConfig {
public long getTimeout(TimeUnit timeUnit) {
return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
}
-
+
public int getMaxWriteThreads() {
return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
}
-
+
public Durability getDurability() {
return durability;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0fe2ae6/core/src/main/java/org/apache/accumulo/core/client/Durability.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Durability.java b/core/src/main/java/org/apache/accumulo/core/client/Durability.java
index 8f85aa2..e1dbf4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Durability.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Durability.java
@@ -4,7 +4,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TDurability;
/**
* The value for the durability of a BatchWriter or ConditionalWriter.
- * @since 1.7.0
+ * @since 1.7.0
*/
public enum Durability {
// Note, the order of these is important; the "highest" Durability is used in group commits.
@@ -44,7 +44,7 @@ public enum Durability {
return TDurability.NONE;
}
}
-
+
// for internal use only
static public Durability fromString(String value) {
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0fe2ae6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 57de347..8a9c510 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -261,7 +261,7 @@ public class TabletServer implements Runnable {
private ReplicationWorker replWorker = null;
private final TabletStatsKeeper statsKeeper;
private final AtomicInteger logIdGenerator = new AtomicInteger();
-
+
private final VolumeManager fs;
public Instance getInstance() {
return serverConfig.getInstance();
@@ -297,7 +297,7 @@ public class TabletServer implements Runnable {
private String lockID;
public static final AtomicLong seekCount = new AtomicLong(0);
-
+
private final AtomicLong totalMinorCompactions = new AtomicLong(0);
public TabletServer(ServerConfigurationFactory conf, VolumeManager fs) {
@@ -739,7 +739,7 @@ public class TabletServer implements Runnable {
sessionManager.unreserveSession(us);
}
}
-
+
private void flush(UpdateSession us) {
int mutationCount = 0;
@@ -860,8 +860,8 @@ public class TabletServer implements Runnable {
if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
// because constraint violations may filter out some
- // mutations, for proper accounting with the client code,
- // need to increment the count based on the original
+ // mutations, for proper accounting with the client code,
+ // need to increment the count based on the original
// number of mutations from the client NOT the filtered number
us.successfulCommits.increment(us.currentTablet, us.queuedMutations.get(us.currentTablet).size());
}
@@ -1101,7 +1101,7 @@ public class TabletServer implements Runnable {
if (mutations.size() > 0) {
CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
-
+
if (cs == null) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0fe2ae6/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
index d777ee4..b0d0b23 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@ -5,9 +5,14 @@ import static org.junit.Assert.assertTrue;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.minicluster.ServerType;
@@ -66,7 +71,7 @@ public class SessionDurabilityIT extends ConfigurableMacIT {
private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception {
Connector c = getConnector();
BatchWriter bw = c.createBatchWriter(tableName, cfg);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < n; i++) {
Mutation m = new Mutation(i + "");
m.put("", "", "");
bw.addMutation(m);
@@ -74,6 +79,52 @@ public class SessionDurabilityIT extends ConfigurableMacIT {
bw.close();
}
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ // write without durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.NONE);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are lost
+ restartTServer();
+ assertEquals(0, count(tableName));
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability2() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ // write with durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.SYNC);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are still there
+ restartTServer();
+ assertEquals(10, count(tableName));
+ }
+
+ private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception {
+ Connector c = getConnector();
+ ConditionalWriter cw = c.createConditionalWriter(tableName, cfg);
+ for (int i = 0; i < n; i++) {
+ ConditionalMutation m = new ConditionalMutation((CharSequence)(i + ""), new Condition("", ""));
+ m.put("", "", "X");
+ assertEquals(Status.ACCEPTED, cw.write(m).getStatus());
+ }
+ }
+
private void restartTServer() throws Exception {
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);