You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/09/05 23:17:21 UTC
[05/18] git commit: ACCUMULO-1957 per-table durability settings
ACCUMULO-1957 per-table durability settings
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e3aa7eac
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e3aa7eac
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e3aa7eac
Branch: refs/heads/master
Commit: e3aa7eac9ee5a8ac79b481cae5d8a47d62a104b5
Parents: 17f6250
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Aug 27 15:29:32 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Sep 5 17:16:58 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 13 +-
.../apache/accumulo/core/conf/PropertyType.java | 2 +
.../apache/accumulo/server/init/Initialize.java | 2 +-
.../accumulo/tserver/TabletMutations.java | 10 +-
.../apache/accumulo/tserver/TabletServer.java | 3 +-
.../apache/accumulo/tserver/log/DfsLogger.java | 65 +++++---
.../tserver/log/TabletServerLogger.java | 19 +--
.../accumulo/tserver/tablet/CommitSession.java | 4 +-
.../accumulo/tserver/tablet/Durability.java | 33 ++++
.../apache/accumulo/tserver/tablet/Tablet.java | 4 +-
.../tserver/tablet/TabletCommitter.java | 2 +-
.../accumulo/test/functional/DurabilityIT.java | 163 +++++++++++++++++++
12 files changed, 275 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 9837867..72d9aa1 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -276,10 +276,8 @@ public enum Property {
"The number of threads for the distributed work queue. These threads are used for copying failed bulk files."),
TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
"Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering from sudden system resets."),
- TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "The method to invoke when sync'ing WALs. HSync will provide " +
- "resiliency in the face of unexpected power outages, at the cost of speed. If method is not available, the legacy 'sync' method " +
- "will be used to ensure backwards compatibility with older Hadoop versions. A value of 'hflush' is the alternative to the default value " +
- "of 'hsync' which will result in faster writes, but with less durability"),
+ @Deprecated
+ TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "This property is deprecated. Use table.durability instead."),
TSERV_REPLICATION_REPLAYERS("tserver.replication.replayer.", null, PropertyType.PREFIX, "Allows configuration of implementation used to apply replicated data"),
TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
@@ -379,7 +377,8 @@ public enum Property {
"Determines the max # of files each tablet in a table can have. When adjusting this property you may want to consider adjusting"
+ " table.compaction.major.ratio also. Setting this property to 0 will make it default to tserver.scan.files.open.max-1, this will prevent a"
+ " tablet from having more files than can be opened. Setting this property low may throttle ingest and increase query performance."),
- TABLE_WALOG_ENABLED("table.walog.enabled", "true", PropertyType.BOOLEAN, "Use the write-ahead log to prevent the loss of data."),
+ @Deprecated
+ TABLE_WALOG_ENABLED("table.walog.enabled", "true", PropertyType.BOOLEAN, "This setting is deprecated. Use table.durability=none instead."),
TABLE_BLOOM_ENABLED("table.bloom.enabled", "false", PropertyType.BOOLEAN, "Use bloom filters on this table."),
TABLE_BLOOM_LOAD_THRESHOLD("table.bloom.load.threshold", "1", PropertyType.COUNT,
"This number of seeks that would actually use a bloom filter must occur before a file's bloom filter is loaded."
@@ -391,6 +390,10 @@ public enum Property {
+ ",org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor, and org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor are"
+ " allowable values. One can extend any of the above mentioned classes to perform specialized parsing of the key. "),
TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING, "The bloom filter hash type"),
+ TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY, "The durability used to write to the write-ahead log." +
+ " Legal values are: none, which skips the write-ahead log; " +
+ "flush, which pushes data to the file system; and " +
+ "sync, which ensures the data is written to disk."),
TABLE_FAILURES_IGNORE("table.failures.ignore", "false", PropertyType.BOOLEAN,
"If you want queries for your table to hang or fail when data is missing from the system, "
+ "then set this to false. When this set to true missing data will be reported but queries "
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index f39a8bd..5d5dd5f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -67,6 +67,8 @@ public enum PropertyType {
CLASSNAME("java class", "[\\w$.]*", "A fully qualified java class name representing a class on the classpath.\n"
+ "An example is 'java.lang.String', rather than 'String'"),
+
+ DURABILITY("durability", "(?:none|log|flush|sync)", "One of 'none', 'flush' or 'sync'."),
STRING("string", ".*",
"An arbitrary string of characters whose format is unspecified and interpreted based on the context of the property to which it applies."), BOOLEAN(
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9b952ba..5f1e287 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -130,7 +130,7 @@ public class Initialize {
static {
initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
- initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
+ initialMetadataConf.put(Property.TABLE_DURABILITY.getKey(), "sync");
initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index e814f0e..a30fa02 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -19,16 +19,19 @@ package org.apache.accumulo.tserver;
import java.util.List;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.Durability;
public class TabletMutations {
private final int tid;
private final int seq;
private final List<Mutation> mutations;
+ private final Durability durability;
- public TabletMutations(int tid, int seq, List<Mutation> mutations) {
+ public TabletMutations(int tid, int seq, List<Mutation> mutations, Durability durability) {
this.tid = tid;
this.seq = seq;
this.mutations = mutations;
+ this.durability = durability;
}
public List<Mutation> getMutations() {
@@ -38,10 +41,13 @@ public class TabletMutations {
public int getTid() {
return tid;
}
+
public int getSeq() {
return seq;
}
-
+ public Durability getDurability() {
+ return durability;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 57e3dee..63bf4a3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -215,6 +215,7 @@ import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.accumulo.tserver.tablet.CompactionInfo;
import org.apache.accumulo.tserver.tablet.CompactionWatcher;
import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.Durability;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Scanner;
@@ -2899,7 +2900,7 @@ public class TabletServer implements Runnable {
public int createLogId(KeyExtent tablet) {
AccumuloConfiguration acuTableConf = getTableConfiguration(tablet);
- if (acuTableConf.getBoolean(Property.TABLE_WALOG_ENABLED)) {
+ if (Durability.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) != Durability.NONE) {
return logIdGenerator.incrementAndGet();
}
return -1;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index c01e54a..d907ee7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -59,6 +59,7 @@ import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.tablet.Durability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -124,7 +125,7 @@ public class DfsLogger {
private final Object closeLock = new Object();
- private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
+ private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, Durability.FLUSH);
private static final LogFileValue EMPTY = new LogFileValue();
@@ -145,9 +146,31 @@ public class DfsLogger {
continue;
}
workQueue.drainTo(work);
+
+ Method durabilityMethod = null;
+ loop:
+ for (LogWork logWork : work) {
+ switch (logWork.durability) {
+ case NONE:
+ // shouldn't make it to the work queue
+ break;
+ case LOG:
+ // do nothing
+ break;
+ case SYNC:
+ durabilityMethod = sync;
+ break loop;
+ case FLUSH:
+ if (durabilityMethod == null) {
+ durabilityMethod = flush;
+ }
+ break;
+ }
+ }
try {
- sync.invoke(logFile);
+ if (durabilityMethod != null)
+ durabilityMethod.invoke(logFile);
} catch (Exception ex) {
log.warn("Exception syncing " + ex);
for (DfsLogger.LogWork logWork : work) {
@@ -165,11 +188,13 @@ public class DfsLogger {
}
static class LogWork {
- CountDownLatch latch;
+ final CountDownLatch latch;
+ final Durability durability;
volatile Exception exception;
- public LogWork(CountDownLatch latch) {
+ public LogWork(CountDownLatch latch, Durability durability) {
this.latch = latch;
+ this.durability = durability;
}
}
@@ -213,11 +238,12 @@ public class DfsLogger {
// filename is unique
return getFileName().hashCode();
}
-
+
private final ServerResources conf;
private FSDataOutputStream logFile;
private DataOutputStream encryptingLogFile = null;
private Method sync;
+ private Method flush;
private String logPath;
private Daemon syncThread;
@@ -337,16 +363,13 @@ public class DfsLogger {
else
logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
- String syncMethod = conf.getConfiguration().get(Property.TSERV_WAL_SYNC_METHOD);
try {
- // hsync: send data to datanodes and sync the data to disk
- sync = logFile.getClass().getMethod(syncMethod);
+ sync = logFile.getClass().getMethod("hsync");
+ flush = logFile.getClass().getMethod("hflush");
} catch (Exception ex) {
- log.warn("Could not find configured " + syncMethod + " method, trying to fall back to old Hadoop sync method", ex);
-
try {
- // sync: send data to datanodes
- sync = logFile.getClass().getMethod("sync");
+ // fall back to sync: send data to datanodes
+ flush = sync = logFile.getClass().getMethod("sync");
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -387,7 +410,6 @@ public class DfsLogger {
key.tserverSession = filename;
key.filename = filename;
write(key, EMPTY);
- sync.invoke(logFile);
log.debug("Got new write-ahead log: " + this);
} catch (Exception ex) {
if (logFile != null)
@@ -499,12 +521,12 @@ public class DfsLogger {
encryptingLogFile.flush();
}
- public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
- return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
+ public LoggerOperation log(int seq, int tid, Mutation mutation, Durability durability) throws IOException {
+ return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation), durability)));
}
- private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys) throws IOException {
- DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
+ private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys, Durability durability) throws IOException {
+ DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
synchronized (DfsLogger.this) {
try {
for (Pair<LogFileKey,LogFileValue> pair : keys) {
@@ -531,6 +553,7 @@ public class DfsLogger {
}
public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+ Durability durability = Durability.NONE;
List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<Pair<LogFileKey,LogFileValue>>();
for (TabletMutations tabletMutations : mutations) {
LogFileKey key = new LogFileKey();
@@ -540,8 +563,10 @@ public class DfsLogger {
LogFileValue value = new LogFileValue();
value.mutations = tabletMutations.getMutations();
data.add(new Pair<LogFileKey,LogFileValue>(key, value));
+ if (tabletMutations.getDurability().ordinal() > durability.ordinal())
+ durability = tabletMutations.getDurability();
}
- return logFileData(data);
+ return logFileData(data, durability);
}
public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
@@ -549,7 +574,7 @@ public class DfsLogger {
key.event = COMPACTION_FINISH;
key.seq = seq;
key.tid = tid;
- return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY)));
+ return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY)), Durability.SYNC);
}
public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
@@ -558,7 +583,7 @@ public class DfsLogger {
key.seq = seq;
key.tid = tid;
key.filename = fqfn;
- return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY)));
+ return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY)), Durability.SYNC);
}
public String getLogger() {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 26e6891..56998d4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.Durability;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -83,14 +84,6 @@ public class TabletServerLogger {
private final AtomicInteger seqGen = new AtomicInteger();
- private static boolean enabled(TableConfiguration tconf) {
- return tconf.getBoolean(Property.TABLE_WALOG_ENABLED);
- }
-
- private static boolean enabled(CommitSession commitSession) {
- return commitSession.getUseWAL();
- }
-
static private abstract class TestCallWithWriteLock {
abstract boolean test();
@@ -369,13 +362,17 @@ public class TabletServerLogger {
});
}
+ private boolean enabled(CommitSession commitSession) {
+ return commitSession.getDurabilty() != Durability.NONE;
+ }
+
public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m) throws IOException {
if (!enabled(commitSession))
return -1;
int seq = write(commitSession, false, new Writer() {
@Override
public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
- return logger.log(tabletSeq, commitSession.getLogId(), m);
+ return logger.log(tabletSeq, commitSession.getLogId(), m, commitSession.getDurabilty());
}
});
logSizeEstimate.addAndGet(m.numBytes());
@@ -398,7 +395,7 @@ public class TabletServerLogger {
List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
- copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
+ copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue(), cs.getDurabilty()));
}
return logger.logManyTablets(copy);
}
@@ -448,7 +445,7 @@ public class TabletServerLogger {
public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr)
throws IOException {
- if (!enabled(tconf))
+ if (Durability.fromString(tconf.get(Property.TABLE_DURABILITY)) == Durability.NONE)
return;
try {
SortedLogRecovery recovery = new SortedLogRecovery(fs);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index 6402797..b2d89c9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -111,8 +111,8 @@ public class CommitSession {
return maxCommittedTime;
}
- public boolean getUseWAL() {
- return committer.getUseWAL();
+ public Durability getDurabilty() {
+ return committer.getDurability();
}
public void mutate(List<Mutation> mutations) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
new file mode 100644
index 0000000..675b196
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public enum Durability {
+ NONE,
+ LOG,
+ FLUSH,
+ SYNC;
+
+ static public Durability fromString(String value) {
+ try {
+ return Durability.valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ return Durability.SYNC;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 37950fc..fdf072a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -2510,8 +2510,8 @@ public class Tablet implements TabletCommitter {
}
@Override
- public boolean getUseWAL() {
- return getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
+ public Durability getDurability() {
+ return Durability.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index a5d197c..b6bb458 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -44,7 +44,7 @@ public interface TabletCommitter {
int getLogId();
- boolean getUseWAL();
+ Durability getDurability();
void updateMemoryUsageStats(long estimatedSizeInBytes, long estimatedSizeInBytes2);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
new file mode 100644
index 0000000..b4d9c83
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class DurabilityIT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ cfg.setNumTservers(1);
+ }
+
+ static final long N = 100000;
+
+ String tableNames[] = null;
+
+ void init() throws Exception {
+ synchronized (this) {
+ if (tableNames == null) {
+ tableNames = getUniqueNames(4);
+ Connector c = getConnector();
+ TableOperations tableOps = c.tableOperations();
+ tableOps.create(tableNames[0]);
+ tableOps.create(tableNames[1]);
+ tableOps.create(tableNames[2]);
+ tableOps.create(tableNames[3]);
+ // default is sync
+ tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
+ tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
+ tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
+ // zookeeper propagation
+ UtilWaitThread.sleep(2 * 1000);
+ }
+ }
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testWriteSpeed() throws Exception {
+ init();
+ // write some gunk
+ long t0 = writeSome(tableNames[0], N); flush(tableNames[0]);
+ long t1 = writeSome(tableNames[1], N); flush(tableNames[1]);
+ long t2 = writeSome(tableNames[2], N); flush(tableNames[2]);
+ long t3 = writeSome(tableNames[3], N); flush(tableNames[3]);
+ System.out.println(String.format("t0 %d t1 %d t2 %d t3 %d", t0, t1, t2, t3));
+ assertTrue(t0 > t1);
+ assertTrue(t1 > t2);
+ assertTrue(t2 > t3);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testSync() throws Exception {
+ init();
+ // sync table should lose nothing
+ getConnector().tableOperations().deleteRows(tableNames[0], null, null);
+ writeSome(tableNames[0], N);
+ restartTServer();
+ assertEquals(N, readSome(tableNames[0], N));
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testFlush() throws Exception {
+ init();
+ // flush table won't lose anything since we're not losing power/dfs
+ getConnector().tableOperations().deleteRows(tableNames[1], null, null);
+ writeSome(tableNames[1], N);
+ restartTServer();
+ assertEquals(N, readSome(tableNames[1], N));
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testLog() throws Exception {
+ init();
+ // we're probably going to lose something the the log setting
+ getConnector().tableOperations().deleteRows(tableNames[2], null, null);
+ writeSome(tableNames[2], N);
+ restartTServer();
+ assertTrue(N > readSome(tableNames[2], N));
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testNone() throws Exception {
+ init();
+ // probably won't get any data back without logging
+ getConnector().tableOperations().deleteRows(tableNames[3], null, null);
+ writeSome(tableNames[3], N);
+ restartTServer();
+ assertTrue(N > readSome(tableNames[3], N));
+ }
+
+ private long readSome(String table, long n) throws Exception {
+ long count = 0;
+ for (@SuppressWarnings("unused") Entry<Key,Value> entry : getConnector().createScanner(table, Authorizations.EMPTY)) {
+ count++;
+ }
+ return count;
+ }
+
+ private void restartTServer() throws Exception {
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.start();
+ }
+
+ private void flush(String table) throws Exception {
+ getConnector().tableOperations().flush(table, null, null, true);
+ }
+
+ private long writeSome(String table, long count) throws Exception {
+ long now = System.currentTimeMillis();
+ Connector c = getConnector();
+ BatchWriter bw = c.createBatchWriter(table, null);
+ for (int i = 1; i < count + 1; i++) {
+ String data = "" + i;
+ Mutation m = new Mutation("" + i);
+ m.put(data, data, data);
+ bw.addMutation(m);
+ if (i % (count/100) == 0) {
+ bw.flush();
+ }
+ }
+ bw.close();
+ long result = System.currentTimeMillis() - now;
+ c.tableOperations().flush(table, null, null, true);
+ return result;
+ }
+
+}