You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by ct...@apache.org on 2017/09/18 13:59:23 UTC
[fluo] 02/02: Update formatting for latest Eclipse formatter
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
commit 42e84cd3f77502614201d0686ff4c16497877c4e
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Thu Sep 14 19:48:16 2017 -0400
Update formatting for latest Eclipse formatter
Updates the formatting using the newest Eclipse formatter for the
formatter-maven-plugin version in fluo-parent-2
---
.../iterators/GarbageCollectionIterator.java | 4 +-
.../accumulo/iterators/NotificationIterator.java | 12 +-
.../fluo/accumulo/iterators/PrewriteIterator.java | 4 +-
.../fluo/accumulo/iterators/SnapshotIterator.java | 4 +-
.../accumulo/iterators/SnapshotIteratorTest.java | 5 +-
.../apache/fluo/accumulo/iterators/TestData.java | 5 +-
.../fluo/api/client/AbstractTransactionBase.java | 4 +-
.../java/org/apache/fluo/api/client/FluoAdmin.java | 4 +-
.../org/apache/fluo/api/client/FluoFactory.java | 5 +-
.../org/apache/fluo/api/client/SnapshotBase.java | 3 +-
.../apache/fluo/api/config/FluoConfiguration.java | 36 +++---
.../main/java/org/apache/fluo/api/data/Bytes.java | 5 +-
.../java/org/apache/fluo/api/data/RowColumn.java | 4 +-
.../main/java/org/apache/fluo/api/data/Span.java | 10 +-
.../api/client/AbstractTransactionBaseTest.java | 8 +-
.../fluo/api/client/scanner/ScannerStreamTest.java | 20 ++--
.../fluo/api/config/FluoConfigurationTest.java | 17 ++-
.../org/apache/fluo/api/data/BytesBuilderTest.java | 5 +-
.../java/org/apache/fluo/api/data/ColumnTest.java | 4 +-
.../org/apache/fluo/api/data/RowColumnTest.java | 8 +-
.../java/org/apache/fluo/api/data/SpanTest.java | 73 ++++++------
.../apache/fluo/cluster/command/FluoCommand.java | 9 +-
.../org/apache/fluo/cluster/runner/AppRunner.java | 5 +-
.../fluo/cluster/runner/ClusterAppRunner.java | 12 +-
.../apache/fluo/cluster/runner/YarnAppRunner.java | 39 +++----
.../org/apache/fluo/cluster/util/FluoInstall.java | 3 +-
.../org/apache/fluo/cluster/yarn/FluoTwillApp.java | 23 ++--
.../java/org/apache/fluo/command/FluoInit.java | 4 +-
.../java/org/apache/fluo/command/FluoScan.java | 4 +-
.../fluo/core/async/AsyncConditionalWriter.java | 14 +--
.../org/apache/fluo/core/client/FluoAdminImpl.java | 37 +++---
.../apache/fluo/core/client/FluoClientImpl.java | 4 +-
.../fluo/core/client/LoaderExecutorAsyncImpl.java | 5 +-
.../org/apache/fluo/core/impl/Environment.java | 7 +-
.../fluo/core/impl/FluoConfigurationImpl.java | 12 +-
.../org/apache/fluo/core/impl/LockResolver.java | 17 ++-
.../fluo/core/impl/ParallelSnapshotScanner.java | 3 +-
.../apache/fluo/core/impl/SharedBatchWriter.java | 3 +-
.../org/apache/fluo/core/impl/SharedResources.java | 28 ++---
.../apache/fluo/core/impl/TimestampTracker.java | 13 +--
.../org/apache/fluo/core/impl/TransactionImpl.java | 42 ++++---
.../org/apache/fluo/core/impl/TransactorCache.java | 15 ++-
.../org/apache/fluo/core/impl/TransactorID.java | 5 +-
.../org/apache/fluo/core/impl/TransactorNode.java | 5 +-
.../java/org/apache/fluo/core/impl/TxInfo.java | 4 +-
.../org/apache/fluo/core/impl/TxInfoCache.java | 5 +-
.../java/org/apache/fluo/core/impl/TxStats.java | 4 +-
.../org/apache/fluo/core/impl/VisibilityCache.java | 3 +-
.../apache/fluo/core/log/TracingCellScanner.java | 12 +-
.../apache/fluo/core/log/TracingColumnScanner.java | 12 +-
.../apache/fluo/core/log/TracingTransaction.java | 27 ++---
.../org/apache/fluo/core/metrics/MetricNames.java | 8 +-
.../fluo/core/metrics/MetricsReporterImpl.java | 4 +-
.../org/apache/fluo/core/metrics/MetricsUtil.java | 5 +-
.../org/apache/fluo/core/metrics/ReporterUtil.java | 5 +-
.../metrics/starters/ConsoleReporterStarter.java | 5 +-
.../core/metrics/starters/CsvReporterStarter.java | 5 +-
.../metrics/starters/Slf4jReporterStarter.java | 5 +-
.../fluo/core/observer/v1/ObserverStoreV1.java | 8 +-
.../apache/fluo/core/observer/v1/ObserversV1.java | 6 +-
.../fluo/core/observer/v2/JsonObservers.java | 10 +-
.../fluo/core/observer/v2/ObserverRegistry.java | 16 +--
.../fluo/core/observer/v2/ObserverStoreV2.java | 4 +-
.../org/apache/fluo/core/oracle/OracleClient.java | 25 ++--
.../org/apache/fluo/core/oracle/OracleServer.java | 25 ++--
.../org/apache/fluo/core/thrift/OracleService.java | 128 ++++++++++-----------
.../java/org/apache/fluo/core/thrift/Stamps.java | 7 +-
.../org/apache/fluo/core/util/AccumuloUtil.java | 7 +-
.../java/org/apache/fluo/core/util/ColumnUtil.java | 6 +-
.../org/apache/fluo/core/util/CuratorUtil.java | 12 +-
.../org/apache/fluo/core/util/FluoExecutors.java | 37 +++---
.../apache/fluo/core/util/FluoThreadFactory.java | 1 -
.../java/org/apache/fluo/core/util/ScanUtil.java | 22 ++--
.../core/worker/NotificationFinderFactory.java | 5 +-
.../fluo/core/worker/NotificationProcessor.java | 9 +-
.../core/worker/finder/hash/PartitionManager.java | 27 ++---
.../finder/hash/PartitionNotificationFinder.java | 5 +-
.../core/worker/finder/hash/SerializedSplits.java | 5 +-
.../fluo/core/worker/finder/hash/TableRange.java | 4 +-
.../org/apache/fluo/core/util/ByteUtilTest.java | 7 +-
.../worker/finder/hash/PartitionManagerTest.java | 10 +-
.../worker/finder/hash/SerializedSplitsTest.java | 15 +--
.../core/worker/finder/hash/TableRangeTest.java | 5 +-
.../java/org/apache/fluo/integration/BankUtil.java | 3 +-
.../java/org/apache/fluo/integration/ITBase.java | 8 +-
.../apache/fluo/integration/TestTransaction.java | 8 +-
.../fluo/integration/client/FluoAdminImplIT.java | 3 +-
.../apache/fluo/integration/impl/CollisionIT.java | 9 +-
.../apache/fluo/integration/impl/FaultyConfig.java | 3 +-
.../org/apache/fluo/integration/impl/FluoIT.java | 5 +-
.../fluo/integration/impl/ObserverConfigIT.java | 19 ++-
.../apache/fluo/integration/impl/ScannerIT.java | 23 ++--
.../fluo/integration/impl/StochasticBankIT.java | 4 +-
.../integration/impl/StrongNotificationIT.java | 7 +-
.../fluo/integration/impl/TimestampTrackerIT.java | 12 +-
.../apache/fluo/integration/impl/TransactorIT.java | 8 +-
.../org/apache/fluo/integration/impl/WorkerIT.java | 4 +-
.../apache/fluo/integration/impl/ZKSecretIT.java | 6 +-
.../org/apache/fluo/integration/log/LogIT.java | 32 +++---
.../fluo/mapreduce/FluoEntryInputFormat.java | 13 +--
.../apache/fluo/mapreduce/FluoOutputFormat.java | 13 +--
.../apache/fluo/mapreduce/FluoRowInputFormat.java | 13 +--
.../fluo/mapreduce/it/FluoFileOutputFormatIT.java | 8 +-
103 files changed, 585 insertions(+), 675 deletions(-)
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
index fda0e80..3d3ee0e 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java
@@ -60,8 +60,8 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
static final String GC_TIMESTAMP_OPT = "timestamp.gc";
private static final String ZOOKEEPER_CONNECT_OPT = "zookeeper.connect";
- private static final ByteSequence NOTIFY_CF_BS = new ArrayByteSequence(
- ColumnConstants.NOTIFY_CF.toArray());
+ private static final ByteSequence NOTIFY_CF_BS =
+ new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
private Long gcTimestamp;
private SortedKeyValueIterator<Key, Value> source;
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/NotificationIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/NotificationIterator.java
index b4440c5..5e5d986 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/NotificationIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/NotificationIterator.java
@@ -50,8 +50,8 @@ import static org.apache.fluo.accumulo.util.NotificationUtil.isNtfy;
*/
public class NotificationIterator extends SkippingIterator {
- public static final ByteSequence NTFY_CF = new ArrayByteSequence(
- ColumnConstants.NOTIFY_CF.toArray());
+ public static final ByteSequence NTFY_CF =
+ new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
private boolean scanOrFullMajc;
private boolean lastKeySet = false;
@@ -62,7 +62,8 @@ public class NotificationIterator extends SkippingIterator {
private void skipRowCol(PushbackIterator source, Key key) throws IOException {
int count = 0;
- while (source.hasTop() && source.getTopKey().equals(key, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ while (source.hasTop()
+ && source.getTopKey().equals(key, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
if (count == 10) {
Key nextKey = key.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS);
if (!seekRange.afterEndKey(nextKey)) {
@@ -146,9 +147,8 @@ public class NotificationIterator extends SkippingIterator {
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException {
- scanOrFullMajc =
- env.getIteratorScope() == IteratorScope.scan
- || (env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction());
+ scanOrFullMajc = env.getIteratorScope() == IteratorScope.scan
+ || (env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction());
super.init(new PushbackIterator(source), options, env);
}
}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
index d3b8125..a5a160d 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java
@@ -107,8 +107,8 @@ public class PrewriteIterator implements SortedKeyValueIterator<Key, Value> {
hasTop = false;
long invalidationTime = -1;
- while (source.hasTop()
- && seekRange.getStartKey().equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
+ while (source.hasTop() && seekRange.getStartKey().equals(source.getTopKey(),
+ PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
index d542b06..3c6939c 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java
@@ -38,8 +38,8 @@ public class SnapshotIterator implements SortedKeyValueIterator<Key, Value> {
@VisibleForTesting
static final String TIMESTAMP_OPT = "timestampOpt";
- private static final ByteSequence NOTIFY_CF_BS = new ArrayByteSequence(
- ColumnConstants.NOTIFY_CF.toArray());
+ private static final ByteSequence NOTIFY_CF_BS =
+ new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
static final Set<ByteSequence> NOTIFY_CF_SET = Collections.singleton(NOTIFY_CF_BS);
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
index 200f926..4605923 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java
@@ -231,9 +231,8 @@ public class SnapshotIteratorTest {
input.add("1 f q1 DATA " + startTime, "" + val2);
}
- Range[] ranges =
- new Range[] {new Range(), Range.exact("0", "f", "q1"), Range.exact("1", "f", "q1"),
- Range.exact("2", "f", "q1")};
+ Range[] ranges = new Range[] {new Range(), Range.exact("0", "f", "q1"),
+ Range.exact("1", "f", "q1"), Range.exact("2", "f", "q1")};
for (Range range : ranges) {
diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
index 99b2b41..2ea42b3 100644
--- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
+++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java
@@ -100,9 +100,8 @@ public class TestData {
case "LOCK":
ts |= ColumnConstants.LOCK_PREFIX;
String rc[] = value.split("\\s+");
- val =
- LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]), value.contains("WRITE"),
- value.contains("DELETE"), value.contains("TRIGGER"), 42l);
+ val = LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]), value.contains("WRITE"),
+ value.contains("DELETE"), value.contains("TRIGGER"), 42l);
break;
case "DATA":
ts |= ColumnConstants.DATA_PREFIX;
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
index 3507e3c..8e44590 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/AbstractTransactionBase.java
@@ -24,8 +24,8 @@ import org.apache.fluo.api.exceptions.AlreadySetException;
* to make implementing TransactionBase easier.
*/
-public abstract class AbstractTransactionBase extends AbstractSnapshotBase implements
- TransactionBase {
+public abstract class AbstractTransactionBase extends AbstractSnapshotBase
+ implements TransactionBase {
public void delete(CharSequence row, Column col) {
delete(s2bConv(row), col);
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
index c0558f2..fb25759 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
@@ -116,8 +116,8 @@ public interface FluoAdmin extends AutoCloseable {
* {@link TableExistsException} if Accumulo table exists. If you want to clear table, set
* {@link InitializationOptions#setClearTable(boolean)} to true.
*/
- void initialize(InitializationOptions opts) throws AlreadyInitializedException,
- TableExistsException;
+ void initialize(InitializationOptions opts)
+ throws AlreadyInitializedException, TableExistsException;
/**
* Updates shared configuration in Zookeeper. Shared configuration consists of all properties
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
index 98d260a..01c210c 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
@@ -116,9 +116,8 @@ public class FluoFactory {
return (T) Class.forName(clazz).getDeclaredConstructor(FluoConfiguration.class)
.newInstance(config);
} catch (ClassNotFoundException e) {
- String msg =
- "Could not find " + clazz
- + " class which could be caused by fluo-core jar not being on the classpath.";
+ String msg = "Could not find " + clazz
+ + " class which could be caused by fluo-core jar not being on the classpath.";
throw new FluoException(msg, e);
} catch (InvocationTargetException e) {
String msg = "Failed to construct " + clazz + " class due to exception";
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index ac8bc40..51c29b9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -128,7 +128,8 @@ public interface SnapshotBase {
* Wrapper for {@link #get(Collection, Set)} that uses Strings. All strings are encoded and
* decoded using UTF-8.
*/
- Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows, Set<Column> columns);
+ Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
+ Set<Column> columns);
/**
* Wrapper for {@link #get(Collection, Set)} that uses Strings. All strings are encoded and
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 6c66b6f..89efb52 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -72,8 +72,8 @@ public class FluoConfiguration extends SimpleConfiguration {
* @deprecated since 1.2.0 replaced by fluo.accumulo.zookeepers
*/
@Deprecated
- public static final String CLIENT_ACCUMULO_ZOOKEEPERS_PROP = CLIENT_PREFIX
- + ".accumulo.zookeepers";
+ public static final String CLIENT_ACCUMULO_ZOOKEEPERS_PROP =
+ CLIENT_PREFIX + ".accumulo.zookeepers";
/**
* @deprecated since 1.2.0 replaced by fluo.connection.zookeeper.timeout
*/
@@ -106,13 +106,13 @@ public class FluoConfiguration extends SimpleConfiguration {
/**
* @since 1.2.0
*/
- public static final String CONNECTION_APPLICATION_NAME_PROP = CONNECTION_PREFIX
- + ".application.name";
+ public static final String CONNECTION_APPLICATION_NAME_PROP =
+ CONNECTION_PREFIX + ".application.name";
/**
* @since 1.2.0
*/
- public static final String CONNECTION_ZOOKEEPER_TIMEOUT_PROP = CONNECTION_PREFIX
- + ".zookeeper.timeout";
+ public static final String CONNECTION_ZOOKEEPER_TIMEOUT_PROP =
+ CONNECTION_PREFIX + ".zookeeper.timeout";
/**
* @since 1.2.0
@@ -127,8 +127,8 @@ public class FluoConfiguration extends SimpleConfiguration {
/**
* @since 1.2.0
*/
- public static final String CONNECTION_RETRY_TIMEOUT_MS_PROP = CONNECTION_PREFIX
- + ".retry.timeout.ms";
+ public static final String CONNECTION_RETRY_TIMEOUT_MS_PROP =
+ CONNECTION_PREFIX + ".retry.timeout.ms";
public static final int CONNECTION_ZOOKEEPER_TIMEOUT_DEFAULT = 30000;
public static final String CONNECTION_ZOOKEEPERS_DEFAULT = "localhost/fluo";
public static final int CONNECTION_RETRY_TIMEOUT_MS_DEFAULT = -1;
@@ -318,15 +318,15 @@ public class FluoConfiguration extends SimpleConfiguration {
} else if (c == '/' || c == '.' || c == ':') {
reason = "invalid character '" + c + "'";
break;
- } else if (c > '\u0000' && c <= '\u001f' || c >= '\u007f' && c <= '\u009F' || c >= '\ud800'
- && c <= '\uf8ff' || c >= '\ufff0' && c <= '\uffff') {
+ } else if (c > '\u0000' && c <= '\u001f' || c >= '\u007f' && c <= '\u009F'
+ || c >= '\ud800' && c <= '\uf8ff' || c >= '\ufff0' && c <= '\uffff') {
reason = "invalid character @" + i;
break;
}
}
if (reason != null) {
- throw new IllegalArgumentException("Invalid application name \"" + name + "\" caused by "
- + reason);
+ throw new IllegalArgumentException(
+ "Invalid application name \"" + name + "\" caused by " + reason);
}
}
@@ -393,8 +393,8 @@ public class FluoConfiguration extends SimpleConfiguration {
* @since 1.2.0
*/
public FluoConfiguration setConnectionRetryTimeout(int timeoutMS) {
- Preconditions.checkArgument(timeoutMS >= -1, CONNECTION_RETRY_TIMEOUT_MS_PROP
- + " must be >= -1");
+ Preconditions.checkArgument(timeoutMS >= -1,
+ CONNECTION_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
setProperty(CONNECTION_RETRY_TIMEOUT_MS_PROP, timeoutMS);
return this;
}
@@ -557,8 +557,8 @@ public class FluoConfiguration extends SimpleConfiguration {
+ " has invalid param. Expected 'key=value' but encountered '" + fields[i] + "'");
}
if (kv[0].isEmpty() || kv[1].isEmpty()) {
- throw new IllegalArgumentException(key + " has empty key or value in param: "
- + fields[i]);
+ throw new IllegalArgumentException(
+ key + " has empty key or value in param: " + fields[i]);
}
params.put(kv[0], kv[1]);
}
@@ -987,8 +987,8 @@ public class FluoConfiguration extends SimpleConfiguration {
}
private String getDepNonEmptyString(String property, String depProperty, String defaultValue) {
- return containsKey(property) ? getNonEmptyString(property, defaultValue) : getNonEmptyString(
- depProperty, defaultValue);
+ return containsKey(property) ? getNonEmptyString(property, defaultValue)
+ : getNonEmptyString(depProperty, defaultValue);
}
private String getDepNonEmptyString(String property, String depProperty) {
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index c490329..2df25cc 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -293,9 +293,8 @@ public final class Bytes implements Comparable<Bytes>, Serializable {
}
byte[] data;
if (bb.hasArray()) {
- data =
- Arrays.copyOfRange(bb.array(), bb.position() + bb.arrayOffset(),
- bb.limit() + bb.arrayOffset());
+ data = Arrays.copyOfRange(bb.array(), bb.position() + bb.arrayOffset(),
+ bb.limit() + bb.arrayOffset());
} else {
data = new byte[bb.remaining()];
// duplicate so that it does not change position
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
index 804f653..7b2a8c2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
@@ -150,8 +150,8 @@ public final class RowColumn implements Comparable<RowColumn>, Serializable {
} else if (!col.isVisibilitySet()) {
return new RowColumn(row, new Column(col.getFamily(), followingBytes(col.getQualifier())));
} else {
- return new RowColumn(row, new Column(col.getFamily(), col.getQualifier(),
- followingBytes(col.getVisibility())));
+ return new RowColumn(row,
+ new Column(col.getFamily(), col.getQualifier(), followingBytes(col.getVisibility())));
}
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
index 603ce64..864edca 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
@@ -292,15 +292,13 @@ public final class Span implements Serializable {
if (colPrefix.isVisibilitySet()) {
Bytes fp = followingPrefix(cv);
- RowColumn end =
- (fp == null ? new RowColumn(row, new Column(cf, cq)).following() : new RowColumn(row,
- new Column(cf, cq, fp)));
+ RowColumn end = (fp == null ? new RowColumn(row, new Column(cf, cq)).following()
+ : new RowColumn(row, new Column(cf, cq, fp)));
return new Span(new RowColumn(row, colPrefix), true, end, false);
} else if (colPrefix.isQualifierSet()) {
Bytes fp = followingPrefix(cq);
- RowColumn end =
- (fp == null ? new RowColumn(row, new Column(cf)).following() : new RowColumn(row,
- new Column(cf, fp)));
+ RowColumn end = (fp == null ? new RowColumn(row, new Column(cf)).following()
+ : new RowColumn(row, new Column(cf, fp)));
return new Span(new RowColumn(row, colPrefix), true, end, false);
} else if (colPrefix.isFamilySet()) {
Bytes fp = followingPrefix(cf);
diff --git a/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java b/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
index 6e08231..fcffe55 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/client/AbstractTransactionBaseTest.java
@@ -142,8 +142,8 @@ public class AbstractTransactionBaseTest {
public void testColumnsVarargs() {
MockTransaction tx = new MockTransaction(createSnapshot());
- Assert
- .assertEquals(ImmutableMap.of(COL1, Bytes.of("v1")), tx.get(Bytes.of("row1"), COL1, COL2));
+ Assert.assertEquals(ImmutableMap.of(COL1, Bytes.of("v1")),
+ tx.get(Bytes.of("row1"), COL1, COL2));
Assert.assertEquals(ImmutableMap.of(COL1, Bytes.of("v2"), COL2, Bytes.of("v3")),
tx.get(Bytes.of("row2"), COL1, COL2));
@@ -191,8 +191,8 @@ public class AbstractTransactionBaseTest {
tx.setWeakNotification("row5", COL2);
Assert.assertEquals(ImmutableSet.of(new RowColumn("row5", COL1)), tx.deletes);
- Assert.assertEquals(ImmutableSet.of(new RowColumnValue("row9", COL2, "99"), new RowColumnValue(
- "row8", COL2, "88")), tx.sets);
+ Assert.assertEquals(ImmutableSet.of(new RowColumnValue("row9", COL2, "99"),
+ new RowColumnValue("row8", COL2, "88")), tx.sets);
Assert.assertEquals(ImmutableSet.of(new RowColumn("row5", COL2)), tx.weakNtfys);
tx.close();
diff --git a/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java b/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java
index d885552..9b1e1d3 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java
@@ -48,12 +48,10 @@ public class ScannerStreamTest {
CellScannerImpl cellScanner = new CellScannerImpl(rowCols);
- Set<RowColumnValue> expected =
- rowCols.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2"))
- .collect(Collectors.toSet());
- Set<RowColumnValue> actualSubSet =
- cellScanner.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2"))
- .collect(Collectors.toSet());
+ Set<RowColumnValue> expected = rowCols.stream()
+ .filter(rcv -> rcv.getColumn().getsFamily().equals("f2")).collect(Collectors.toSet());
+ Set<RowColumnValue> actualSubSet = cellScanner.stream()
+ .filter(rcv -> rcv.getColumn().getsFamily().equals("f2")).collect(Collectors.toSet());
Assert.assertNotEquals(empty, actualSubSet);
Assert.assertNotEquals(empty, cellScanner.stream().collect(Collectors.toSet()));
@@ -75,12 +73,10 @@ public class ScannerStreamTest {
ColumnScanner colScanner = new ColumnScannerImpl(row, colsVal);
- Set<ColumnValue> expected =
- colsVal.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2"))
- .collect(Collectors.toSet());
- Set<ColumnValue> colSubSet =
- colScanner.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2"))
- .collect(Collectors.toSet());
+ Set<ColumnValue> expected = colsVal.stream()
+ .filter(cv -> cv.getColumn().getsFamily().equals("f2")).collect(Collectors.toSet());
+ Set<ColumnValue> colSubSet = colScanner.stream()
+ .filter(cv -> cv.getColumn().getsFamily().equals("f2")).collect(Collectors.toSet());
Assert.assertNotEquals(empty, colSubSet);
Assert.assertNotEquals(empty, colScanner.stream().collect(Collectors.toSet()));
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index c4888d9..316d0fc 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -47,8 +47,8 @@ public class FluoConfigurationTest {
base.getZookeeperTimeout());
Assert.assertEquals(FluoConfiguration.CONNECTION_RETRY_TIMEOUT_MS_DEFAULT,
base.getConnectionRetryTimeout());
- Assert
- .assertEquals(FluoConfiguration.ACCUMULO_ZOOKEEPERS_DEFAULT, base.getAccumuloZookeepers());
+ Assert.assertEquals(FluoConfiguration.ACCUMULO_ZOOKEEPERS_DEFAULT,
+ base.getAccumuloZookeepers());
Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
base.getAccumuloClasspath());
Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT, base.getWorkerThreads());
@@ -86,8 +86,8 @@ public class FluoConfigurationTest {
@Test
public void testSetGet() {
FluoConfiguration config = new FluoConfiguration();
- Assert.assertEquals("path1,path2", config.setAccumuloClasspath("path1,path2")
- .getAccumuloClasspath());
+ Assert.assertEquals("path1,path2",
+ config.setAccumuloClasspath("path1,path2").getAccumuloClasspath());
Assert.assertEquals("path1,path2", config.setAccumuloJars("path1,path2").getAccumuloJars());
Assert.assertEquals("instance", config.setAccumuloInstance("instance").getAccumuloInstance());
Assert.assertEquals("pass", config.setAccumuloPassword("pass").getAccumuloPassword());
@@ -106,8 +106,8 @@ public class FluoConfigurationTest {
Assert.assertFalse(config.setMiniStartAccumulo(false).getMiniStartAccumulo());
Assert.assertEquals("mydata", config.setMiniDataDir("mydata").getMiniDataDir());
Assert.assertEquals(17, config.setConnectionRetryTimeout(17).getConnectionRetryTimeout());
- Assert.assertEquals("/path/to/dir", config.setObserverInitDir("/path/to/dir")
- .getObserverInitDir());
+ Assert.assertEquals("/path/to/dir",
+ config.setObserverInitDir("/path/to/dir").getObserverInitDir());
Assert.assertEquals("hdfs://localhost/mydir",
config.setObserverJarsUrl("hdfs://localhost/mydir").getObserverJarsUrl());
Assert.assertEquals("hdfs123", config.setDfsRoot("hdfs123").getDfsRoot());
@@ -442,9 +442,8 @@ public class FluoConfigurationTest {
Assert.fail();
}
}
- String[] nonEmptyMethods =
- {"setAccumuloInstance", "setAccumuloTable", "setAccumuloUser", "setAccumuloZookeepers",
- "setMiniDataDir", "setInstanceZookeepers", "setDfsRoot"};
+ String[] nonEmptyMethods = {"setAccumuloInstance", "setAccumuloTable", "setAccumuloUser",
+ "setAccumuloZookeepers", "setMiniDataDir", "setInstanceZookeepers", "setDfsRoot"};
for (String methodName : nonEmptyMethods) {
try {
config.getClass().getMethod(methodName, String.class).invoke(config, "");
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/BytesBuilderTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/BytesBuilderTest.java
index 8c44861..3ffb27d 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/BytesBuilderTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/BytesBuilderTest.java
@@ -172,9 +172,8 @@ public class BytesBuilderTest {
Assert.assertEquals("abc123:xyz789", b3.toString());
- Bytes b4 =
- Bytes.builder().append((CharSequence) "abc123").append(":").append((CharSequence) "xyz789")
- .toBytes();
+ Bytes b4 = Bytes.builder().append((CharSequence) "abc123").append(":")
+ .append((CharSequence) "xyz789").toBytes();
Assert.assertEquals("abc123:xyz789", b4.toString());
}
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/ColumnTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/ColumnTest.java
index 11e6dd0..e599a60 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/ColumnTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/ColumnTest.java
@@ -46,8 +46,8 @@ public class ColumnTest {
Assert.assertEquals(new Column("a"), new Column(Bytes.of("a")));
Assert.assertEquals(new Column("a"), new Column(Bytes.of("a"), Bytes.EMPTY, Bytes.EMPTY));
- Assert.assertEquals(new Column("a").hashCode(), new Column(Bytes.of("a"), Bytes.EMPTY,
- Bytes.EMPTY).hashCode());
+ Assert.assertEquals(new Column("a").hashCode(),
+ new Column(Bytes.of("a"), Bytes.EMPTY, Bytes.EMPTY).hashCode());
col = new Column("cf1");
Assert.assertTrue(col.isFamilySet());
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/RowColumnTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/RowColumnTest.java
index 85b5eb6..bc8df79 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/RowColumnTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/RowColumnTest.java
@@ -61,10 +61,10 @@ public class RowColumnTest {
Assert.assertEquals(RowColumn.EMPTY, new RowColumn().following());
Assert.assertEquals(new RowColumn(fb), new RowColumn("data").following());
- Assert.assertEquals(new RowColumn("row", new Column(fb)), new RowColumn("row", new Column(
- "data")).following());
- Assert.assertEquals(new RowColumn("row", new Column(Bytes.of("cf"), fb)), new RowColumn("row",
- new Column("cf", "data")).following());
+ Assert.assertEquals(new RowColumn("row", new Column(fb)),
+ new RowColumn("row", new Column("data")).following());
+ Assert.assertEquals(new RowColumn("row", new Column(Bytes.of("cf"), fb)),
+ new RowColumn("row", new Column("cf", "data")).following());
Assert.assertEquals(new RowColumn("row", new Column(Bytes.of("cf"), Bytes.of("cq"), fb)),
new RowColumn("row", new Column("cf", "cq", "data")).following());
}
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/SpanTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/SpanTest.java
index 305b3ba..8866fd0 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/SpanTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/SpanTest.java
@@ -51,22 +51,22 @@ public class SpanTest {
// Test with Bytes input
Assert.assertEquals(new Span(rw1b, true, rw2b, false),
Span.newBuilder().startRow(rw1b).endRow(rw2b).exclusive().build());
- Assert.assertEquals(new Span(rw1b, false, rw2b, false), Span.newBuilder().startRow(rw1b)
- .exclusive().endRow(rw2b).exclusive().build());
+ Assert.assertEquals(new Span(rw1b, false, rw2b, false),
+ Span.newBuilder().startRow(rw1b).exclusive().endRow(rw2b).exclusive().build());
Assert.assertEquals(new Span(rw1b, true, rw2b, true),
Span.newBuilder().startRow(rw1b).endRow(rw2b).build());
- Assert.assertEquals(new Span(rw1b, false, rw2b, true), Span.newBuilder().startRow(rw1b)
- .exclusive().endRow(rw2b).build());
+ Assert.assertEquals(new Span(rw1b, false, rw2b, true),
+ Span.newBuilder().startRow(rw1b).exclusive().endRow(rw2b).build());
// Test with String input
Assert.assertEquals(new Span(rw1b, true, rw2b, false),
Span.newBuilder().startRow(rw1s).endRow(rw2s).exclusive().build());
- Assert.assertEquals(new Span(rw1b, false, rw2b, false), Span.newBuilder().startRow(rw1s)
- .exclusive().endRow(rw2s).exclusive().build());
+ Assert.assertEquals(new Span(rw1b, false, rw2b, false),
+ Span.newBuilder().startRow(rw1s).exclusive().endRow(rw2s).exclusive().build());
Assert.assertEquals(new Span(rw1b, true, rw2b, true),
Span.newBuilder().startRow(rw1s).endRow(rw2s).build());
- Assert.assertEquals(new Span(rw1b, false, rw2b, true), Span.newBuilder().startRow(rw1s)
- .exclusive().endRow(rw2s).build());
+ Assert.assertEquals(new Span(rw1b, false, rw2b, true),
+ Span.newBuilder().startRow(rw1s).exclusive().endRow(rw2s).build());
}
@Test
@@ -76,12 +76,12 @@ public class SpanTest {
RowColumn rc2 = new RowColumn(rw2b, new Column(cf2b));
RowColumn frc2 = rc2.following();
- Assert.assertEquals(new Span(RowColumn.EMPTY, true, frc2, false), Span.newBuilder()
- .endRow(rw2b).fam(cf2b).build());
- Assert.assertEquals(new Span(RowColumn.EMPTY, true, rc2, false), Span.newBuilder().endRow(rw2b)
- .fam(cf2b).exclusive().build());
- Assert.assertEquals(new Span(rc1, true, RowColumn.EMPTY, true), Span.newBuilder()
- .startRow(rw1b).fam(cf1b).build());
+ Assert.assertEquals(new Span(RowColumn.EMPTY, true, frc2, false),
+ Span.newBuilder().endRow(rw2b).fam(cf2b).build());
+ Assert.assertEquals(new Span(RowColumn.EMPTY, true, rc2, false),
+ Span.newBuilder().endRow(rw2b).fam(cf2b).exclusive().build());
+ Assert.assertEquals(new Span(rc1, true, RowColumn.EMPTY, true),
+ Span.newBuilder().startRow(rw1b).fam(cf1b).build());
Assert.assertEquals(new Span(frc1, true, RowColumn.EMPTY, true),
Span.newBuilder().startRow(rw1b).fam(cf1b).exclusive().build());
}
@@ -93,14 +93,14 @@ public class SpanTest {
RowColumn rc2 = new RowColumn(rw2b, new Column(cf2b));
RowColumn frc2 = rc2.following();
- Assert.assertEquals(new Span(rc1, true, frc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).endRow(rw2b).fam(cf2b).build());
- Assert.assertEquals(new Span(rc1, true, rc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
- .endRow(rw2b).fam(cf2b).exclusive().build());
+ Assert.assertEquals(new Span(rc1, true, frc2, false),
+ Span.newBuilder().startRow(rw1b).fam(cf1b).endRow(rw2b).fam(cf2b).build());
+ Assert.assertEquals(new Span(rc1, true, rc2, false),
+ Span.newBuilder().startRow(rw1b).fam(cf1b).endRow(rw2b).fam(cf2b).exclusive().build());
Assert.assertEquals(new Span(frc1, true, frc2, false),
Span.newBuilder().startRow(rw1b).fam(cf1b).exclusive().endRow(rw2b).fam(cf2b).build());
- Assert.assertEquals(new Span(frc1, true, rc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).exclusive().endRow(rw2b).fam(cf2b).exclusive().build());
+ Assert.assertEquals(new Span(frc1, true, rc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
+ .exclusive().endRow(rw2b).fam(cf2b).exclusive().build());
}
@Test
@@ -110,15 +110,14 @@ public class SpanTest {
RowColumn rc2 = new RowColumn(rw2b, new Column(cf2b, cq2b));
RowColumn frc2 = rc2.following();
- Assert.assertEquals(new Span(rc1, true, frc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).qual(cq1b).endRow(rw2b).fam(cf2b).qual(cq2b).build());
+ Assert.assertEquals(new Span(rc1, true, frc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
+ .qual(cq1b).endRow(rw2b).fam(cf2b).qual(cq2b).build());
Assert.assertEquals(new Span(rc1, true, rc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
.qual(cq1b).endRow(rw2b).fam(cf2b).qual(cq2b).exclusive().build());
- Assert.assertEquals(new Span(frc1, true, frc2, false),
- Span.newBuilder().startRow(rw1b).fam(cf1b).qual(cq1b).exclusive().endRow(rw2b).fam(cf2b)
- .qual(cq2b).build());
- Assert.assertEquals(new Span(frc1, true, rc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).qual(cq1b).exclusive().endRow(rw2b).fam(cf2b).qual(cq2b).exclusive().build());
+ Assert.assertEquals(new Span(frc1, true, frc2, false), Span.newBuilder().startRow(rw1b)
+ .fam(cf1b).qual(cq1b).exclusive().endRow(rw2b).fam(cf2b).qual(cq2b).build());
+ Assert.assertEquals(new Span(frc1, true, rc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
+ .qual(cq1b).exclusive().endRow(rw2b).fam(cf2b).qual(cq2b).exclusive().build());
}
@Test
@@ -128,16 +127,16 @@ public class SpanTest {
RowColumn rc2 = new RowColumn(rw2b, new Column(cf2b, cq2b, Bytes.of(cv2s)));
RowColumn frc2 = rc2.following();
- Assert.assertEquals(new Span(rc1, true, frc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).qual(cq1b).vis(cv1b).endRow(rw2b).fam(cf2b).qual(cq2b).vis(cv2b).build());
+ Assert.assertEquals(new Span(rc1, true, frc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
+ .qual(cq1b).vis(cv1b).endRow(rw2b).fam(cf2b).qual(cq2b).vis(cv2b).build());
Assert.assertEquals(new Span(rc1, true, rc2, false), Span.newBuilder().startRow(rw1b).fam(cf1b)
.qual(cq1b).vis(cv1b).endRow(rw2b).fam(cf2b).qual(cq2b).vis(cv2b).exclusive().build());
Assert.assertEquals(new Span(frc1, true, frc2, false),
Span.newBuilder().startRow(rw1b).fam(cf1b).qual(cq1b).vis(cv1b).exclusive().endRow(rw2b)
.fam(cf2b).qual(cq2b).vis(cv2b).build());
- Assert.assertEquals(new Span(frc1, true, rc2, false), Span.newBuilder().startRow(rw1b)
- .fam(cf1b).qual(cq1b).vis(cv1b).exclusive().endRow(rw2b).fam(cf2b).qual(cq2b).vis(cv2b)
- .exclusive().build());
+ Assert.assertEquals(new Span(frc1, true, rc2, false),
+ Span.newBuilder().startRow(rw1b).fam(cf1b).qual(cq1b).vis(cv1b).exclusive().endRow(rw2b)
+ .fam(cf2b).qual(cq2b).vis(cv2b).exclusive().build());
}
@Test
@@ -179,8 +178,9 @@ public class SpanTest {
Assert.assertTrue(s.isStartInclusive());
Assert.assertEquals(rw1b, s.getEnd().getRow());
Assert.assertEquals(cf1b, s.getEnd().getColumn().getFamily());
- Assert.assertEquals(new RowColumn(rw1b, new Column(cf1b, cq1b)).following().getColumn()
- .getQualifier(), s.getEnd().getColumn().getQualifier());
+ Assert.assertEquals(
+ new RowColumn(rw1b, new Column(cf1b, cq1b)).following().getColumn().getQualifier(),
+ s.getEnd().getColumn().getQualifier());
Assert.assertFalse(s.getEnd().getColumn().isVisibilitySet());
Assert.assertFalse(s.isEndInclusive());
@@ -193,8 +193,9 @@ public class SpanTest {
Assert.assertEquals(rw1b, s.getEnd().getRow());
Assert.assertEquals(cf1b, s.getEnd().getColumn().getFamily());
Assert.assertEquals(cq1b, s.getEnd().getColumn().getQualifier());
- Assert.assertEquals(new RowColumn(rw1b, new Column(cf1b, cq1b, cv1b)).following().getColumn()
- .getVisibility(), s.getEnd().getColumn().getVisibility());
+ Assert.assertEquals(
+ new RowColumn(rw1b, new Column(cf1b, cq1b, cv1b)).following().getColumn().getVisibility(),
+ s.getEnd().getColumn().getVisibility());
Assert.assertFalse(s.isEndInclusive());
}
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
index 689173f..447257b 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
@@ -32,8 +32,8 @@ public class FluoCommand {
public static void verifyNoArgs(String[] remainArgs) {
if (remainArgs.length != 0) {
- System.err.println("ERROR - Received unexpected command-line arguments: "
- + Arrays.toString(remainArgs));
+ System.err.println(
+ "ERROR - Received unexpected command-line arguments: " + Arrays.toString(remainArgs));
System.exit(-1);
}
}
@@ -72,9 +72,8 @@ public class FluoCommand {
break;
case "start":
verifyNoArgs(remainArgs);
- runner.start(fluoInstall.getAppConfiguration(appName),
- fluoInstall.getAppConfDir(appName), fluoInstall.getAppLibDir(appName),
- fluoInstall.getLibDir());
+ runner.start(fluoInstall.getAppConfiguration(appName), fluoInstall.getAppConfDir(appName),
+ fluoInstall.getAppLibDir(appName), fluoInstall.getLibDir());
break;
case "scan":
runner.scan(fluoInstall.resolveFluoConfiguration(appName), remainArgs);
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 7e938e9..ca62c69 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -19,6 +19,7 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+
import javax.inject.Provider;
import com.beust.jcommander.JCommander;
@@ -197,9 +198,7 @@ public abstract class AppRunner {
description = "Hex encode non ascii bytes", arity = 1)
public boolean hexEncNonAscii = true;
- @Parameter(
- names = "--raw",
- help = true,
+ @Parameter(names = "--raw", help = true,
description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo "
+ "internal schema, making it easier to comprehend.")
public boolean scanAccumuloTable = false;
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
index e16ce0c..b54f04b 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
@@ -115,12 +115,12 @@ public abstract class ClusterAppRunner extends AppRunner {
try {
config.validate();
} catch (IllegalArgumentException e) {
- System.err.println("Error - Invalid fluo.properties (" + propsPath + ") due to "
- + e.getMessage());
+ System.err
+ .println("Error - Invalid fluo.properties (" + propsPath + ") due to " + e.getMessage());
System.exit(-1);
} catch (Exception e) {
- System.err.println("Error - Invalid fluo.properties (" + propsPath + ") due to "
- + e.getMessage());
+ System.err
+ .println("Error - Invalid fluo.properties (" + propsPath + ") due to " + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
@@ -176,8 +176,8 @@ public abstract class ClusterAppRunner extends AppRunner {
}
}
- System.out.println("Initializing Fluo '" + config.getApplicationName()
- + "' application using " + propsPath);
+ System.out.println(
+ "Initializing Fluo '" + config.getApplicationName() + "' application using " + propsPath);
try {
admin.initialize(initOpts);
} catch (Exception e) {
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index 4a65384..f5c97c5 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -138,10 +138,8 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
if (twillIdExists(appConfig)) {
String twillId = getTwillId(appConfig);
yarnId = getAppId(appConfig);
- TwillController controller =
- getTwillRunner(appConfig).lookup(
- getYarnApplicationName(appConfig.getApplicationName()),
- RunIds.fromString(twillId));
+ TwillController controller = getTwillRunner(appConfig).lookup(
+ getYarnApplicationName(appConfig.getApplicationName()), RunIds.fromString(twillId));
if (controller == null) {
state = "STOPPED";
} else {
@@ -164,14 +162,13 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
if (twillIdExists(config)) {
String runId = getTwillId(config);
- TwillController controller =
- getTwillRunner(config).lookup(getYarnApplicationName(config.getApplicationName()),
- RunIds.fromString(runId));
+ TwillController controller = getTwillRunner(config)
+ .lookup(getYarnApplicationName(config.getApplicationName()), RunIds.fromString(runId));
if ((controller != null) && isReady(controller)) {
- throw new FluoException("A YARN application " + getAppInfo(config)
- + " is already running for the Fluo '" + config.getApplicationName()
- + "' application! Please stop it using 'fluo stop " + config.getApplicationName()
- + "' before starting a new one.");
+ throw new FluoException(
+ "A YARN application " + getAppInfo(config) + " is already running for the Fluo '"
+ + config.getApplicationName() + "' application! Please stop it using 'fluo stop "
+ + config.getApplicationName() + "' before starting a new one.");
}
}
@@ -259,9 +256,8 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
checkIfInitialized(config);
String twillId = verifyTwillId(config);
- TwillController controller =
- getTwillRunner(config).lookup(getYarnApplicationName(config.getApplicationName()),
- RunIds.fromString(twillId));
+ TwillController controller = getTwillRunner(config)
+ .lookup(getYarnApplicationName(config.getApplicationName()), RunIds.fromString(twillId));
if ((controller != null) && isReady(controller)) {
System.out.print("Stopping Fluo '" + config.getApplicationName() + "' application "
+ getAppInfo(config) + "...");
@@ -278,9 +274,8 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
String twillId = verifyTwillId(config);
- TwillController controller =
- getTwillRunner(config).lookup(getYarnApplicationName(config.getApplicationName()),
- RunIds.fromString(twillId));
+ TwillController controller = getTwillRunner(config)
+ .lookup(getYarnApplicationName(config.getApplicationName()), RunIds.fromString(twillId));
if (controller != null) {
System.out.print("Killing Fluo '" + config.getApplicationName() + "' application "
+ getAppInfo(config) + "...");
@@ -308,9 +303,8 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
}
elapsed += 500;
if ((maxWaitMs != -1) && (elapsed > maxWaitMs)) {
- String msg =
- String.format("Exceeded max wait time to retrieve ResourceReport from Twill."
- + " Elapsed time = %s ms", elapsed);
+ String msg = String.format("Exceeded max wait time to retrieve ResourceReport from Twill."
+ + " Elapsed time = %s ms", elapsed);
log.error(msg);
throw new IllegalStateException(msg);
}
@@ -354,9 +348,8 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
return;
}
String twillId = getTwillId(config);
- TwillController controller =
- getTwillRunner(config).lookup(getYarnApplicationName(config.getApplicationName()),
- RunIds.fromString(twillId));
+ TwillController controller = getTwillRunner(config)
+ .lookup(getYarnApplicationName(config.getApplicationName()), RunIds.fromString(twillId));
if (controller == null) {
System.out.print("Fluo '" + config.getApplicationName() + "' application "
+ getAppInfo(config) + " has stopped.");
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
index 447ef78..70ce7d0 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
@@ -144,7 +144,8 @@ public class FluoInstall {
private void verifyAppPath(String appName, String path) {
if (!(new File(path).exists())) {
- throw new FluoException("Path does not exist for Fluo '" + appName + "' application: " + path);
+ throw new FluoException(
+ "Path does not exist for Fluo '" + appName + "' application: " + path);
}
}
}
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index 7308d8c..f0d8d73 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -78,9 +78,10 @@ public class FluoTwillApp implements TwillApplication {
final int workerMaxMemory = FluoYarnConfig.getWorkerMaxMemory(config);
final int workerNumCores = FluoYarnConfig.getWorkerNumCores(config);
- log.info("Configuring Fluo '{}' application with {} Oracle instances and {} Worker instances "
- + "with following properties:", config.getApplicationName(), oracleInstances,
- workerInstances);
+ log.info(
+ "Configuring Fluo '{}' application with {} Oracle instances and {} Worker instances "
+ + "with following properties:",
+ config.getApplicationName(), oracleInstances, workerInstances);
log.info("{} = {}", FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
log.info("{} = {}", FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP, workerMaxMemory);
@@ -88,19 +89,16 @@ public class FluoTwillApp implements TwillApplication {
log.info("{} = {}", FluoYarnConfig.WORKER_NUM_CORES_PROP, workerNumCores);
// Start building Fluo Twill application
- MoreRunnable moreRunnable =
- TwillSpecification.Builder.with()
- .setName(YarnAppRunner.getYarnApplicationName(config.getApplicationName()))
- .withRunnable();
+ MoreRunnable moreRunnable = TwillSpecification.Builder.with()
+ .setName(YarnAppRunner.getYarnApplicationName(config.getApplicationName())).withRunnable();
// Configure Oracle(s)
ResourceSpecification oracleResources =
ResourceSpecification.Builder.with().setVirtualCores(oracleNumCores)
.setMemory(oracleMaxMemory, SizeUnit.MEGA).setInstances(oracleInstances).build();
- LocalFileAdder fileAdder =
- moreRunnable.add(OracleRunnable.ORACLE_NAME, new OracleRunnable(), oracleResources)
- .withLocalFiles();
+ LocalFileAdder fileAdder = moreRunnable
+ .add(OracleRunnable.ORACLE_NAME, new OracleRunnable(), oracleResources).withLocalFiles();
RunnableSetter runnableSetter = addConfigFiles(fileAdder).apply();
// Configure Worker(s)
@@ -108,9 +106,8 @@ public class FluoTwillApp implements TwillApplication {
ResourceSpecification.Builder.with().setVirtualCores(workerNumCores)
.setMemory(workerMaxMemory, SizeUnit.MEGA).setInstances(workerInstances).build();
- fileAdder =
- runnableSetter.add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(), workerResources)
- .withLocalFiles();
+ fileAdder = runnableSetter
+ .add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(), workerResources).withLocalFiles();
runnableSetter = addConfigFiles(fileAdder).apply();
// Set runnable order, build and return TwillSpecification
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
index 12d2021..7d83f34 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
@@ -105,8 +105,8 @@ public class FluoInit {
InitOptions opts = InitOptions.parse(args);
File applicationPropsFile = new File(opts.getAppPropsPath());
- Preconditions.checkArgument(applicationPropsFile.exists(), opts.getAppPropsPath()
- + " does not exist");
+ Preconditions.checkArgument(applicationPropsFile.exists(),
+ opts.getAppPropsPath() + " does not exist");
FluoConfiguration config = CommandUtil.resolveFluoConfig();
config.load(applicationPropsFile);
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
index 28caa07..61e05df 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -48,9 +48,7 @@ public class FluoScan {
description = "Hex encode non ascii bytes", arity = 1)
public boolean hexEncNonAscii = true;
- @Parameter(
- names = "--raw",
- help = true,
+ @Parameter(names = "--raw", help = true,
description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo "
+ "internal schema, making it easier to comprehend.")
public boolean scanAccumuloTable = false;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index cb49e82..75a9fa1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -36,8 +36,8 @@ import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Limit;
-public class AsyncConditionalWriter implements
- AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
+public class AsyncConditionalWriter
+ implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
private final ConditionalWriter cw;
private final ListeningExecutorService les;
@@ -46,12 +46,10 @@ public class AsyncConditionalWriter implements
public AsyncConditionalWriter(Environment env, ConditionalWriter cw) {
this.cw = cw;
- int numThreads =
- env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_THREADS,
- FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
- int permits =
- env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
- FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
+ int numThreads = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_THREADS,
+ FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
+ int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
+ FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
this.les =
MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
// the conditional writer currently has not memory limits... give it too much and it blows out
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 717d9f3..f25a110 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -94,22 +94,23 @@ public class FluoAdminImpl implements FluoAdmin {
}
@Override
- public void initialize(InitializationOptions opts) throws AlreadyInitializedException,
- TableExistsException {
+ public void initialize(InitializationOptions opts)
+ throws AlreadyInitializedException, TableExistsException {
if (!config.hasRequiredAdminProps()) {
throw new IllegalArgumentException("Admin configuration is missing required properties");
}
- Preconditions.checkArgument(!ZookeeperUtil.parseRoot(config.getInstanceZookeepers())
- .equals("/"), "The Zookeeper connection string (set by 'fluo.connection.zookeepers') "
- + " must have a chroot suffix.");
+ Preconditions.checkArgument(
+ !ZookeeperUtil.parseRoot(config.getInstanceZookeepers()).equals("/"),
+ "The Zookeeper connection string (set by 'fluo.connection.zookeepers') "
+ + " must have a chroot suffix.");
- Preconditions.checkArgument(config.getObserverJarsUrl().isEmpty()
- || config.getObserverInitDir().isEmpty(),
+ Preconditions.checkArgument(
+ config.getObserverJarsUrl().isEmpty() || config.getObserverInitDir().isEmpty(),
"Only one of 'fluo.observer.init.dir' and 'fluo.observer.jars.url' can be set");
if (zookeeperInitialized() && !opts.getClearZookeeper()) {
- throw new AlreadyInitializedException("Fluo application already initialized at "
- + config.getAppZookeepers());
+ throw new AlreadyInitializedException(
+ "Fluo application already initialized at " + config.getAppZookeepers());
}
Connector conn = AccumuloUtil.getConnector(config);
@@ -166,8 +167,8 @@ public class FluoAdminImpl implements FluoAdmin {
String contextName = "fluo-" + config.getApplicationName();
conn.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
- conn.tableOperations().setProperty(config.getAccumuloTable(),
- AccumuloProps.TABLE_CLASSPATH, contextName);
+ conn.tableOperations().setProperty(config.getAccumuloTable(), AccumuloProps.TABLE_CLASSPATH,
+ contextName);
}
if (config.getObserverJarsUrl().isEmpty() && !config.getObserverInitDir().trim().isEmpty()) {
@@ -205,8 +206,9 @@ public class FluoAdminImpl implements FluoAdmin {
// TODO set Fluo data version
CuratorUtil.putData(curator, ZookeeperPath.CONFIG, new byte[0],
CuratorUtil.NodeExistsPolicy.FAIL);
- CuratorUtil.putData(curator, ZookeeperPath.CONFIG_ACCUMULO_TABLE, config.getAccumuloTable()
- .getBytes(StandardCharsets.UTF_8), CuratorUtil.NodeExistsPolicy.FAIL);
+ CuratorUtil.putData(curator, ZookeeperPath.CONFIG_ACCUMULO_TABLE,
+ config.getAccumuloTable().getBytes(StandardCharsets.UTF_8),
+ CuratorUtil.NodeExistsPolicy.FAIL);
CuratorUtil.putData(curator, ZookeeperPath.CONFIG_ACCUMULO_INSTANCE_NAME,
accumuloInstanceName.getBytes(StandardCharsets.UTF_8), CuratorUtil.NodeExistsPolicy.FAIL);
CuratorUtil.putData(curator, ZookeeperPath.CONFIG_ACCUMULO_INSTANCE_ID,
@@ -348,8 +350,8 @@ public class FluoAdminImpl implements FluoAdmin {
public static SimpleConfiguration getZookeeperConfig(FluoConfiguration config) {
if (!isInitialized(config)) {
- throw new IllegalStateException("Fluo Application '" + config.getApplicationName()
- + "' has not been initialized");
+ throw new IllegalStateException(
+ "Fluo Application '" + config.getApplicationName() + "' has not been initialized");
}
SimpleConfiguration zooConfig = new SimpleConfiguration();
@@ -384,9 +386,8 @@ public class FluoAdminImpl implements FluoAdmin {
ClassLoader cl = FluoAdminImpl.class.getClassLoader();
URL[] urls = ((URLClassLoader) cl).getURLs();
- String regex =
- config.getString(FluoConfigurationImpl.ACCUMULO_JARS_REGEX_PROP,
- FluoConfigurationImpl.ACCUMULO_JARS_REGEX_DEFAULT);
+ String regex = config.getString(FluoConfigurationImpl.ACCUMULO_JARS_REGEX_PROP,
+ FluoConfigurationImpl.ACCUMULO_JARS_REGEX_DEFAULT);
Pattern pattern = Pattern.compile(regex);
for (URL url : urls) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
index c1cf6d8..2274990 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
@@ -48,8 +48,8 @@ public class FluoClientImpl implements FluoClient {
public static final AutoCloseable setupReporters(Environment env, String id,
AtomicInteger reporterCounter) {
- return ReporterUtil.setupReporters(env, FluoConfiguration.FLUO_PREFIX + "." + id + "."
- + reporterCounter.getAndIncrement());
+ return ReporterUtil.setupReporters(env,
+ FluoConfiguration.FLUO_PREFIX + "." + id + "." + reporterCounter.getAndIncrement());
}
public FluoClientImpl(FluoConfiguration connConfig) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index 916dcca..5f644e8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -37,7 +37,6 @@ import org.apache.fluo.core.util.Counter;
import org.apache.fluo.core.util.FluoExecutors;
import org.slf4j.LoggerFactory;
-
public class LoaderExecutorAsyncImpl implements LoaderExecutor {
private final ExecutorService executor;
private final Semaphore semaphore;
@@ -51,8 +50,8 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
private void setException(Throwable t) {
if (!exceptionRef.compareAndSet(null, t)) {
- LoggerFactory.getLogger(LoaderExecutorAsyncImpl.class).debug(
- "Multiple exceptions occured, not reporting subsequent ones", t);
+ LoggerFactory.getLogger(LoaderExecutorAsyncImpl.class)
+ .debug("Multiple exceptions occured, not reporting subsequent ones", t);
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 2bfdbd7..f8aac72 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -31,8 +31,8 @@ import org.apache.fluo.api.metrics.MetricsReporter;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.metrics.MetricNames;
import org.apache.fluo.core.metrics.MetricsReporterImpl;
-import org.apache.fluo.core.observer.RegisteredObservers;
import org.apache.fluo.core.observer.ObserverUtil;
+import org.apache.fluo.core.observer.RegisteredObservers;
import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.CuratorUtil;
@@ -118,9 +118,8 @@ public class Environment implements AutoCloseable {
new String(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_APPLICATION_ID),
StandardCharsets.UTF_8);
- table =
- new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE),
- StandardCharsets.UTF_8);
+ table = new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE),
+ StandardCharsets.UTF_8);
observers = ObserverUtil.load(curator);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
index e3805dc..3e5ee85 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
@@ -27,15 +27,15 @@ public class FluoConfigurationImpl {
public static final String ORACLE_PORT_PROP = FLUO_IMPL_PREFIX + ".oracle.port";
public static final String WORKER_FINDER_PROP = FLUO_IMPL_PREFIX + ".worker.finder";
- public static final String WORKER_PARTITION_GROUP_SIZE = FLUO_IMPL_PREFIX
- + ".worker.finder.partition.groupSize";
+ public static final String WORKER_PARTITION_GROUP_SIZE =
+ FLUO_IMPL_PREFIX + ".worker.finder.partition.groupSize";
public static final int WORKER_PARTITION_GROUP_SIZE_DEFAULT = 7;
public static final String METRICS_RESERVOIR_PROP = FLUO_IMPL_PREFIX + ".metrics.reservoir";
- public static final String NTFY_FINDER_MIN_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
- + ".worker.finder.minSleep";
+ public static final String NTFY_FINDER_MIN_SLEEP_TIME_PROP =
+ FLUO_IMPL_PREFIX + ".worker.finder.minSleep";
public static final int NTFY_FINDER_MIN_SLEEP_TIME_DEFAULT = 5000;
- public static final String NTFY_FINDER_MAX_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
- + ".worker.finder.maxSleep";
+ public static final String NTFY_FINDER_MAX_SLEEP_TIME_PROP =
+ FLUO_IMPL_PREFIX + ".worker.finder.maxSleep";
public static final int NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
public static final String ACCUMULO_JARS_REGEX_PROP = FLUO_IMPL_PREFIX + ".accumulo.jars.regex";
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index 6ff9437..f3068e9 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -60,8 +60,8 @@ public class LockResolver {
for (Entry<Key, Value> lock : locks) {
LockValue lockVal = new LockValue(lock.getValue().get());
PrimaryRowColumn prc =
- new PrimaryRowColumn(lockVal.getPrimaryRow(), lockVal.getPrimaryColumn(), lock.getKey()
- .getTimestamp() & ColumnConstants.TIMESTAMP_MASK);
+ new PrimaryRowColumn(lockVal.getPrimaryRow(), lockVal.getPrimaryColumn(),
+ lock.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK);
List<Entry<Key, Value>> lockList = groupedLocks.get(prc);
if (lockList == null) {
@@ -169,8 +169,8 @@ public class LockResolver {
break;
case UNKNOWN:
default:
- throw new IllegalStateException("can not abort : " + group.getKey() + " ("
- + txInfo.status + ")");
+ throw new IllegalStateException(
+ "can not abort : " + group.getKey() + " (" + txInfo.status + ")");
}
}
@@ -204,9 +204,8 @@ public class LockResolver {
IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
PrewriteIterator.setSnaptime(iterConf, startTs);
- ConditionalFlutation delLockMutation =
- new ConditionalFlutation(env, prc.prow, new FluoCondition(env, prc.pcol).setIterators(
- iterConf).setValue(lockValue));
+ ConditionalFlutation delLockMutation = new ConditionalFlutation(env, prc.prow,
+ new FluoCondition(env, prc.pcol).setIterators(iterConf).setValue(lockValue));
delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX | prc.startTs,
DelLockValue.encodeRollback(true, true));
@@ -235,8 +234,8 @@ public class LockResolver {
long lockTs = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
// TODO may be that a stronger sanity check that could be done here
if (commitTs < lockTs) {
- throw new IllegalStateException("bad commitTs : " + entry.getKey() + " (" + commitTs + "<"
- + lockTs + ")");
+ throw new IllegalStateException(
+ "bad commitTs : " + entry.getKey() + " (" + commitTs + "<" + lockTs + ")");
}
Mutation mut = getMutation(entry.getKey().getRowData(), mutations);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 16470f2..37e0df5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -63,7 +63,8 @@ public class ParallelSnapshotScanner {
this.columnConverter = new CachedColumnConverter(columns);
}
- ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats) {
+ ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs,
+ TxStats stats) {
for (RowColumn rc : cells) {
byte[] r = rc.getRow().toArray();
byte[] cf = rc.getColumn().getFamily().toArray();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
index 96052f2..2b53cbf 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
@@ -107,7 +107,8 @@ public class SharedBatchWriter {
}
- private void processBatches(ArrayList<MutationBatch> batches) throws MutationsRejectedException {
+ private void processBatches(ArrayList<MutationBatch> batches)
+ throws MutationsRejectedException {
for (MutationBatch mutationBatch : batches) {
if (mutationBatch != end) {
bw.addMutations(mutationBatch.mutations);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
index e9a121a..b629fdf 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedResources.java
@@ -70,35 +70,27 @@ public class SharedResources implements AutoCloseable {
int numTservers = env.getConnector().instanceOperations().getTabletServers().size();
int numBWThreads = FluoConfigurationImpl.getNumBWThreads(env.getConfiguration(), numTservers);
- bw =
- env.getConnector().createBatchWriter(env.getTable(),
- new BatchWriterConfig().setMaxWriteThreads(numBWThreads));
+ bw = env.getConnector().createBatchWriter(env.getTable(),
+ new BatchWriterConfig().setMaxWriteThreads(numBWThreads));
sbw = new SharedBatchWriter(bw);
int numCWThreads = FluoConfigurationImpl.getNumCWThreads(env.getConfiguration(), numTservers);
- cw =
- env.getConnector().createConditionalWriter(
- env.getTable(),
- new ConditionalWriterConfig().setAuthorizations(env.getAuthorizations())
- .setMaxWriteThreads(numCWThreads));
+ cw = env.getConnector().createConditionalWriter(env.getTable(), new ConditionalWriterConfig()
+ .setAuthorizations(env.getAuthorizations()).setMaxWriteThreads(numCWThreads));
bulkCw =
- env.getConnector().createConditionalWriter(
- env.getTable(),
- new ConditionalWriterConfig().setAuthorizations(env.getAuthorizations())
- .setMaxWriteThreads(numCWThreads));
+ env.getConnector().createConditionalWriter(env.getTable(), new ConditionalWriterConfig()
+ .setAuthorizations(env.getAuthorizations()).setMaxWriteThreads(numCWThreads));
txInfoCache = new TxInfoCache(env);
visCache = new VisibilityCache();
metricRegistry = new MetricRegistry();
- int commitThreads =
- env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_COMMIT_THREADS,
- FluoConfigurationImpl.ASYNC_COMMIT_THREADS_DEFAULT);
+ int commitThreads = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_COMMIT_THREADS,
+ FluoConfigurationImpl.ASYNC_COMMIT_THREADS_DEFAULT);
asyncCommitExecutor = FluoExecutors.newFixedThreadPool(commitThreads, "async-commits");
- commitThreads =
- env.getConfiguration().getInt(FluoConfigurationImpl.SYNC_COMMIT_THREADS,
- FluoConfigurationImpl.SYNC_COMMIT_THREADS_DEFAULT);
+ commitThreads = env.getConfiguration().getInt(FluoConfigurationImpl.SYNC_COMMIT_THREADS,
+ FluoConfigurationImpl.SYNC_COMMIT_THREADS_DEFAULT);
syncCommitExecutor = FluoExecutors.newFixedThreadPool(commitThreads, "sync-commits");
acw = new AsyncConditionalWriter(env, cw);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TimestampTracker.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TimestampTracker.java
index 69e5e6d..7354d4e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TimestampTracker.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TimestampTracker.java
@@ -159,12 +159,12 @@ public class TimestampTracker implements AutoCloseable {
*/
public synchronized void removeTimestamp(long ts) throws NoSuchElementException {
Preconditions.checkState(!closed, "tracker closed ");
- Preconditions.checkState(allocationsInProgress > 0, "allocationsInProgress should be > 0 "
- + allocationsInProgress);
+ Preconditions.checkState(allocationsInProgress > 0,
+ "allocationsInProgress should be > 0 " + allocationsInProgress);
Objects.requireNonNull(node);
if (timestamps.remove(ts) == false) {
- throw new NoSuchElementException("Timestamp " + ts
- + " was previously removed or does not exist");
+ throw new NoSuchElementException(
+ "Timestamp " + ts + " was previously removed or does not exist");
}
allocationsInProgress--;
@@ -176,9 +176,8 @@ public class TimestampTracker implements AutoCloseable {
private void createZkNode(long ts) {
Preconditions.checkState(node == null, "expected node to be null");
- node =
- new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL,
- getNodePath(), LongUtil.toByteArray(ts));
+ node = new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL,
+ getNodePath(), LongUtil.toByteArray(ts));
CuratorUtil.startAndWait(node, 10);
zkTimestamp = ts;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 04d5ef9..aa5e7da 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -88,10 +88,10 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
public static final byte[] EMPTY = new byte[0];
public static final Bytes EMPTY_BS = Bytes.of(EMPTY);
- private static final Bytes DELETE = Bytes
- .of("special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
- private static final Bytes NTFY_VAL = Bytes
- .of("special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
+ private static final Bytes DELETE =
+ Bytes.of("special delete object f804266bf94935edd45ae3e6c287b93c1814295c");
+ private static final Bytes NTFY_VAL =
+ Bytes.of("special ntfy value ce0c523e6e4dc093be8a2736b82eca1b95f97ed4");
private static boolean isWrite(Bytes val) {
return val != NTFY_VAL;
@@ -529,15 +529,14 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
for (ColumnUpdate cu : updates) {
// TODO avoid create col vis object
- Column col =
- new Column(Bytes.of(cu.getColumnFamily()), Bytes.of(cu.getColumnQualifier()),
- Bytes.of(cu.getColumnVisibility()));
+ Column col = new Column(Bytes.of(cu.getColumnFamily()), Bytes.of(cu.getColumnQualifier()),
+ Bytes.of(cu.getColumnVisibility()));
if (notification.getColumn().equals(col)) {
// check to see if ACK exist after notification
Key startKey = SpanUtil.toKey(notification.getRowColumn());
- startKey.setTimestamp(ColumnConstants.ACK_PREFIX
- | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK));
+ startKey.setTimestamp(
+ ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK));
Key endKey = SpanUtil.toKey(notification.getRowColumn());
endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() + 1));
@@ -779,7 +778,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
}
}
- private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback, RowColumn primary) {
+ private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
+ RowColumn primary) {
if (updates.size() == 0) {
// TODO do async
@@ -871,8 +871,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
break;
case COMMITTED:
default:
- throw new IllegalStateException("unexpected tx state " + txInfo.status + " " + cd.prow
- + " " + cd.pcol);
+ throw new IllegalStateException(
+ "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
}
}
@@ -901,9 +901,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
if (cm == null) {
- cm =
- prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow,
- cd.pcol, false);
+ cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow,
+ cd.pcol, false);
} else {
prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
}
@@ -1084,9 +1083,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
Condition lockCheck =
- new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(
- LockValue.encode(cd.prow, cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger,
- getTransactorID()));
+ new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
+ cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
@@ -1124,8 +1122,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
switch (txInfo.status) {
case COMMITTED:
if (txInfo.commitTs != commitTs) {
- throw new IllegalStateException(cd.prow + " " + cd.pcol + " " + txInfo.commitTs
- + "!=" + commitTs);
+ throw new IllegalStateException(
+ cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
}
ms = Status.ACCEPTED;
break;
@@ -1191,8 +1189,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
}
@VisibleForTesting
- public boolean finishCommit(CommitData cd, Stamp commitStamp) throws TableNotFoundException,
- MutationsRejectedException {
+ public boolean finishCommit(CommitData cd, Stamp commitStamp)
+ throws TableNotFoundException, MutationsRejectedException {
deleteLocks(cd, commitStamp.getTxTimestamp());
return true;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorCache.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorCache.java
index ecf840a..c2d0d1a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorCache.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorCache.java
@@ -48,15 +48,13 @@ public class TransactorCache implements AutoCloseable {
public TransactorCache(Environment env) {
- timeoutCache =
- CacheBuilder.newBuilder().maximumSize(1 << 15)
- .expireAfterAccess(TxInfoCache.CACHE_TIMEOUT_MIN, TimeUnit.MINUTES)
- .concurrencyLevel(10).build();
+ timeoutCache = CacheBuilder.newBuilder().maximumSize(1 << 15)
+ .expireAfterAccess(TxInfoCache.CACHE_TIMEOUT_MIN, TimeUnit.MINUTES).concurrencyLevel(10)
+ .build();
this.env = env;
- cache =
- new PathChildrenCache(env.getSharedResources().getCurator(),
- ZookeeperPath.TRANSACTOR_NODES, true);
+ cache = new PathChildrenCache(env.getSharedResources().getCurator(),
+ ZookeeperPath.TRANSACTOR_NODES, true);
try {
cache.start(StartMode.BUILD_INITIAL_CACHE);
status = TcStatus.OPEN;
@@ -71,7 +69,8 @@ public class TransactorCache implements AutoCloseable {
lockTs);
}
- public void addTimedoutTransactor(final Long transactorId, final long lockTs, final Long startTime) {
+ public void addTimedoutTransactor(final Long transactorId, final long lockTs,
+ final Long startTime) {
try {
AtomicLong cachedLockTs = timeoutCache.get(transactorId, new Callable<AtomicLong>() {
@Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorID.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorID.java
index 7e4a5f9..a2b2d29 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorID.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorID.java
@@ -58,9 +58,8 @@ public class TransactorID {
private static Long createID(CuratorFramework curator) {
try {
- DistributedAtomicLong counter =
- new DistributedAtomicLong(curator, ZookeeperPath.TRANSACTOR_COUNT,
- new ExponentialBackoffRetry(1000, 10));
+ DistributedAtomicLong counter = new DistributedAtomicLong(curator,
+ ZookeeperPath.TRANSACTOR_COUNT, new ExponentialBackoffRetry(1000, 10));
AtomicValue<Long> nextId = counter.increment();
while (nextId.succeeded() == false) {
nextId = counter.increment();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorNode.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorNode.java
index f567c19..f04288d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorNode.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactorNode.java
@@ -51,9 +51,8 @@ public class TransactorNode implements AutoCloseable {
public TransactorNode(Environment env, TransactorID tid) {
this.env = env;
this.tid = tid;
- node =
- new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL,
- getNodePath(), tid.toString().getBytes());
+ node = new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL,
+ getNodePath(), tid.toString().getBytes());
CuratorUtil.startAndWait(node, 10);
status = TrStatus.OPEN;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
index 56b02ae..849e2b5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java
@@ -80,8 +80,8 @@ public class TxInfo {
if (timePtr != startTs) {
// expect this to always be false, must be a bug in the iterator
- throw new IllegalStateException(prow + " " + pcol + " (" + timePtr + " != " + startTs
- + ") ");
+ throw new IllegalStateException(
+ prow + " " + pcol + " (" + timePtr + " != " + startTs + ") ");
}
txInfo.status = TxStatus.COMMITTED;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfoCache.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfoCache.java
index 9b42649..ed43be2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfoCache.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfoCache.java
@@ -38,9 +38,8 @@ public class TxInfoCache {
private final Environment env;
TxInfoCache(Environment env) {
- cache =
- CacheBuilder.newBuilder().expireAfterAccess(CACHE_TIMEOUT_MIN, TimeUnit.MINUTES)
- .maximumWeight(10000000).weigher(new TxStatusWeigher()).concurrencyLevel(10).build();
+ cache = CacheBuilder.newBuilder().expireAfterAccess(CACHE_TIMEOUT_MIN, TimeUnit.MINUTES)
+ .maximumWeight(10000000).weigher(new TxStatusWeigher()).concurrencyLevel(10).build();
this.env = env;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
index 030014c..3b2af5a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
@@ -149,8 +149,8 @@ public class TxStats {
MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxLockWaitTime(alias))
.update(getLockWaitTime(), TimeUnit.MILLISECONDS);
}
- MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(alias)).update(
- getReadTime(), TimeUnit.MILLISECONDS);
+ MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(alias))
+ .update(getReadTime(), TimeUnit.MILLISECONDS);
if (getCollisions() > 0) {
registry.meter(names.getTxWithCollision(alias)).mark();
registry.meter(names.getTxCollisions(alias)).mark(getCollisions());
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/VisibilityCache.java b/modules/core/src/main/java/org/apache/fluo/core/impl/VisibilityCache.java
index 156a908..06b10c4 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/VisibilityCache.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/VisibilityCache.java
@@ -48,8 +48,7 @@ public class VisibilityCache {
VisibilityCache() {
visCache =
- CacheBuilder.newBuilder()
- .expireAfterAccess(TxInfoCache.CACHE_TIMEOUT_MIN, TimeUnit.MINUTES)
+ CacheBuilder.newBuilder().expireAfterAccess(TxInfoCache.CACHE_TIMEOUT_MIN, TimeUnit.MINUTES)
.maximumWeight(10000000).weigher(new VisWeigher()).concurrencyLevel(10).build();
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingCellScanner.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingCellScanner.java
index 9d77315..b31b2c3 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingCellScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingCellScanner.java
@@ -40,12 +40,10 @@ public class TracingCellScanner implements CellScanner {
@Override
public Iterator<RowColumnValue> iterator() {
- return Iterators.transform(
- wrappedScanner.iterator(),
- rcv -> {
- log.trace("txid: {} scanId: {} next()-> {} {}", txid, scanId,
- Hex.encNonAscii(rcv.getRowColumn()), Hex.encNonAscii(rcv.getValue()));
- return rcv;
- });
+ return Iterators.transform(wrappedScanner.iterator(), rcv -> {
+ log.trace("txid: {} scanId: {} next()-> {} {}", txid, scanId,
+ Hex.encNonAscii(rcv.getRowColumn()), Hex.encNonAscii(rcv.getValue()));
+ return rcv;
+ });
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingColumnScanner.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingColumnScanner.java
index d950a80..ec8413f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingColumnScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingColumnScanner.java
@@ -42,13 +42,11 @@ public class TracingColumnScanner implements ColumnScanner {
@Override
public Iterator<ColumnValue> iterator() {
- return Iterators.transform(
- cs.iterator(),
- cv -> {
- log.trace("txid: {} scanId: {} next()-> {} {} {}", txid, scanId, encRow,
- Hex.encNonAscii(cv.getColumn()), Hex.encNonAscii(cv.getValue()));
- return cv;
- });
+ return Iterators.transform(cs.iterator(), cv -> {
+ log.trace("txid: {} scanId: {} next()-> {} {} {}", txid, scanId, encRow,
+ Hex.encNonAscii(cv.getColumn()), Hex.encNonAscii(cv.getValue()));
+ return cv;
+ });
}
@Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index 0753cd4..a4c35c3 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -39,14 +39,14 @@ import org.slf4j.LoggerFactory;
import static org.apache.fluo.core.util.Hex.encNonAscii;
-public class TracingTransaction extends AbstractTransactionBase implements AsyncTransaction,
- Snapshot {
+public class TracingTransaction extends AbstractTransactionBase
+ implements AsyncTransaction, Snapshot {
private static final Logger log = LoggerFactory.getLogger(FluoConfiguration.TRANSACTION_PREFIX);
- private static final Logger collisionLog = LoggerFactory
- .getLogger(FluoConfiguration.TRANSACTION_PREFIX + ".collisions");
- private static final Logger summaryLog = LoggerFactory
- .getLogger(FluoConfiguration.TRANSACTION_PREFIX + ".summary");
+ private static final Logger collisionLog =
+ LoggerFactory.getLogger(FluoConfiguration.TRANSACTION_PREFIX + ".collisions");
+ private static final Logger summaryLog =
+ LoggerFactory.getLogger(FluoConfiguration.TRANSACTION_PREFIX + ".summary");
private final AsyncTransaction tx;
private final long txid;
@@ -221,8 +221,8 @@ public class TracingTransaction extends AbstractTransactionBase implements Async
collisionLog.trace("txid: {} class: {}", txid, clazz.getName());
}
- collisionLog.trace("txid: {} collisions: {}", txid, toStringEncNonAsciiMBSC(tx.getStats()
- .getRejected()));
+ collisionLog.trace("txid: {} collisions: {}", txid,
+ toStringEncNonAsciiMBSC(tx.getStats().getRejected()));
}
@Override
@@ -235,11 +235,12 @@ public class TracingTransaction extends AbstractTransactionBase implements Async
className = clazz.getSimpleName();
}
// TODO log total # read, see fluo-426
- summaryLog.trace("txid: {} thread : {} time: {} ({} {}) #ret: {} #set: {} #collisions: {} "
- + "waitTime: {} committed: {} class: {}", txid, Thread.currentThread().getId(),
- stats.getTime(), stats.getReadTime(), stats.getCommitTime(), stats.getEntriesReturned(),
- stats.getEntriesSet(), stats.getCollisions(), stats.getLockWaitTime(), committed,
- className);
+ summaryLog.trace(
+ "txid: {} thread : {} time: {} ({} {}) #ret: {} #set: {} #collisions: {} "
+ + "waitTime: {} committed: {} class: {}",
+ txid, Thread.currentThread().getId(), stats.getTime(), stats.getReadTime(),
+ stats.getCommitTime(), stats.getEntriesReturned(), stats.getEntriesSet(),
+ stats.getCollisions(), stats.getLockWaitTime(), committed, className);
}
tx.close();
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricNames.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricNames.java
index 3842a84..9a40f01 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricNames.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricNames.java
@@ -20,8 +20,8 @@ import org.apache.fluo.api.config.FluoConfiguration;
public class MetricNames {
- public static final String METRICS_REPORTER_ID_PROP = FluoConfiguration.FLUO_PREFIX
- + ".metrics.reporter.id";
+ public static final String METRICS_REPORTER_ID_PROP =
+ FluoConfiguration.FLUO_PREFIX + ".metrics.reporter.id";
// Metrics prefixes for 'default' metrics
public static final String CLASS_PREFIX = FluoConfiguration.FLUO_PREFIX + ".class";
@@ -48,8 +48,8 @@ public class MetricNames {
private final String oracleServerStamps;
public MetricNames(String metricsReporterId, String appName) {
- Preconditions.checkArgument(!appName.contains("."), "Fluo App name should not contain '.': "
- + appName);
+ Preconditions.checkArgument(!appName.contains("."),
+ "Fluo App name should not contain '.': " + appName);
Preconditions.checkArgument(!metricsReporterId.contains("."),
"Metrics Reporter ID should not contain '.': " + metricsReporterId);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsReporterImpl.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsReporterImpl.java
index d29c9cb..8804571 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsReporterImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsReporterImpl.java
@@ -72,7 +72,7 @@ public class MetricsReporterImpl implements MetricsReporter {
private static void validateName(String metricName) {
Objects.requireNonNull(metricName);
- Preconditions.checkArgument(!metricName.contains("."), "Metric name " + metricName
- + " should not contain a period '.'");
+ Preconditions.checkArgument(!metricName.contains("."),
+ "Metric name " + metricName + " should not contain a period '.'");
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsUtil.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsUtil.java
index ce87bbb..8e64cce 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/MetricsUtil.java
@@ -26,9 +26,8 @@ import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotRes
public class MetricsUtil {
public static Reservoir getConfiguredReservoir(FluoConfiguration config) {
- String clazz =
- config.getString(FluoConfigurationImpl.METRICS_RESERVOIR_PROP,
- HdrHistogramResetOnSnapshotReservoir.class.getName());
+ String clazz = config.getString(FluoConfigurationImpl.METRICS_RESERVOIR_PROP,
+ HdrHistogramResetOnSnapshotReservoir.class.getName());
try {
return Class.forName(clazz).asSubclass(Reservoir.class).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/ReporterUtil.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/ReporterUtil.java
index 8569f42..f40e122 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/ReporterUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/ReporterUtil.java
@@ -65,9 +65,8 @@ public class ReporterUtil {
}
final String hdrSnapshotClass = HdrHistogramResetOnSnapshotReservoir.class.getName();
- String clazz =
- env.getConfiguration().getString(FluoConfigurationImpl.METRICS_RESERVOIR_PROP,
- hdrSnapshotClass);
+ String clazz = env.getConfiguration().getString(FluoConfigurationImpl.METRICS_RESERVOIR_PROP,
+ hdrSnapshotClass);
if ((allReporters.size() > 1) && (clazz.equals(hdrSnapshotClass))) {
throw new IllegalStateException("Multiple metrics reporters cannot be configured when using "
+ hdrSnapshotClass + " as corrupt metrics can be reported");
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/ConsoleReporterStarter.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/ConsoleReporterStarter.java
index 37c5398..1413ee4 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/ConsoleReporterStarter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/ConsoleReporterStarter.java
@@ -48,9 +48,8 @@ public class ConsoleReporterStarter implements ReporterStarter {
out = System.err;
}
- ConsoleReporter reporter =
- ConsoleReporter.forRegistry(params.getMetricRegistry()).convertDurationsTo(durationUnit)
- .convertRatesTo(rateUnit).outputTo(out).build();
+ ConsoleReporter reporter = ConsoleReporter.forRegistry(params.getMetricRegistry())
+ .convertDurationsTo(durationUnit).convertRatesTo(rateUnit).outputTo(out).build();
reporter.start(config.getInt("frequency", 60), TimeUnit.SECONDS);
log.info("Reporting metrics to console");
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/CsvReporterStarter.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/CsvReporterStarter.java
index bd1c2f5..824561d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/CsvReporterStarter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/CsvReporterStarter.java
@@ -45,9 +45,8 @@ public class CsvReporterStarter implements ReporterStarter {
TimeUnit durationUnit =
TimeUnit.valueOf(config.getString("durationUnit", "milliseconds").toUpperCase());
- CsvReporter reporter =
- CsvReporter.forRegistry(params.getMetricRegistry()).convertDurationsTo(durationUnit)
- .convertRatesTo(rateUnit).build(new File(dir));
+ CsvReporter reporter = CsvReporter.forRegistry(params.getMetricRegistry())
+ .convertDurationsTo(durationUnit).convertRatesTo(rateUnit).build(new File(dir));
reporter.start(config.getInt("frequency", 60), TimeUnit.SECONDS);
log.info("Reporting metrics as csv to directory {}", dir);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/Slf4jReporterStarter.java b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/Slf4jReporterStarter.java
index e38f6b1..1e231da 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/Slf4jReporterStarter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/metrics/starters/Slf4jReporterStarter.java
@@ -44,9 +44,8 @@ public class Slf4jReporterStarter implements ReporterStarter {
TimeUnit.valueOf(config.getString("durationUnit", "milliseconds").toUpperCase());
Logger logger = LoggerFactory.getLogger(config.getString("logger", "metrics"));
- Slf4jReporter reporter =
- Slf4jReporter.forRegistry(params.getMetricRegistry()).convertDurationsTo(durationUnit)
- .convertRatesTo(rateUnit).outputTo(logger).build();
+ Slf4jReporter reporter = Slf4jReporter.forRegistry(params.getMetricRegistry())
+ .convertDurationsTo(durationUnit).convertRatesTo(rateUnit).outputTo(logger).build();
reporter.start(config.getInt("frequency", 60), TimeUnit.SECONDS);
log.info("Reporting metrics using slf4j");
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
index c2fc35a..feba76e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -78,8 +78,8 @@ public class ObserverStoreV1 implements ObserverStore {
+ "found. Check for class name misspellings or failure to include "
+ "the observer jar.", e1);
} catch (InstantiationException | IllegalAccessException e2) {
- throw new FluoException("Observer class '" + ospec.getClassName()
- + "' could not be created.", e2);
+ throw new FluoException(
+ "Observer class '" + ospec.getClassName() + "' could not be created.", e2);
}
SimpleConfiguration oc = ospec.getConfiguration();
@@ -115,8 +115,8 @@ public class ObserverStoreV1 implements ObserverStore {
} catch (NoNodeException nne) {
// it's ok if node doesn't exist
} catch (Exception e) {
- logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
- + "], error=[" + e.getMessage() + "]");
+ logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath + "], error=["
+ + e.getMessage() + "]");
throw new RuntimeException(e);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index f75377e..ff3c667 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -103,9 +103,9 @@ class ObserversV1 implements Observers {
}
if (!observer.getObservedColumn().getColumn().equals(col)) {
- throw new IllegalStateException("Mismatch between configured column and class column "
- + observerConfig.getClassName() + " " + col + " "
- + observer.getObservedColumn().getColumn());
+ throw new IllegalStateException(
+ "Mismatch between configured column and class column " + observerConfig.getClassName()
+ + " " + col + " " + observer.getObservedColumn().getColumn());
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
index 44f229b..9f731b1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
@@ -33,10 +33,8 @@ class JsonObservers {
JsonObservers(String obsProviderClass, Map<Column, NotificationType> columns) {
this.obsProviderClass = obsProviderClass;
- this.observedColumns =
- columns.entrySet().stream()
- .map(entry -> new JsonObservedColumn(entry.getKey(), entry.getValue()))
- .collect(toList());
+ this.observedColumns = columns.entrySet().stream()
+ .map(entry -> new JsonObservedColumn(entry.getKey(), entry.getValue())).collect(toList());
}
public String getObserverProviderClass() {
@@ -44,8 +42,8 @@ class JsonObservers {
}
public Map<Column, NotificationType> getObservedColumns() {
- return observedColumns.stream().collect(
- toMap(JsonObservedColumn::getColumn, JsonObservedColumn::getNotificationType));
+ return observedColumns.stream()
+ .collect(toMap(JsonObservedColumn::getColumn, JsonObservedColumn::getNotificationType));
}
@Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
index 979b867..f1dae0f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
@@ -23,10 +23,10 @@ import java.util.Set;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.api.observer.StringObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ObserverRegistry implements ObserverProvider.Registry {
@@ -84,21 +84,21 @@ public class ObserverRegistry implements ObserverProvider.Registry {
if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
log.warn(
"Observer {} implements close(). Close is not called on Observers registered using ObserverProvider."
- + " Close is only called on Observers configured the old way.", obs.getClass()
- .getName());
+ + " Close is only called on Observers configured the old way.",
+ obs.getClass().getName());
}
} catch (NoSuchMethodException | SecurityException e) {
throw new RuntimeException("Failed to check if close() is implemented", e);
}
if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
- throw new IllegalArgumentException("Column " + col
- + " not previously configured for strong notifications");
+ throw new IllegalArgumentException(
+ "Column " + col + " not previously configured for strong notifications");
}
if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
- throw new IllegalArgumentException("Column " + col
- + " not previously configured for weak notifications");
+ throw new IllegalArgumentException(
+ "Column " + col + " not previously configured for weak notifications");
}
if (observers.containsKey(col)) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
index 768271c..c7ac734 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -88,8 +88,8 @@ public class ObserverStoreV2 implements ObserverStore {
+ "found. Check for class name misspellings or failure to include "
+ "the observer provider jar.", e1);
} catch (InstantiationException | IllegalAccessException e2) {
- throw new FluoException("ObserverProvider class '" + obsProviderClass
- + "' could not be created.", e2);
+ throw new FluoException(
+ "ObserverProvider class '" + obsProviderClass + "' could not be created.", e2);
}
return observerProvider;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 4afb0a1..012c4a8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -81,8 +81,8 @@ public class OracleClient implements AutoCloseable {
}
}
- private class TimestampRetriever extends LeaderSelectorListenerAdapter implements Runnable,
- PathChildrenCacheListener {
+ private class TimestampRetriever extends LeaderSelectorListenerAdapter
+ implements Runnable, PathChildrenCacheListener {
private LeaderSelector leaderSelector;
private CuratorFramework curatorFramework;
@@ -230,8 +230,8 @@ public class OracleClient implements AutoCloseable {
}
}
- private synchronized void connect() throws IOException, KeeperException, InterruptedException,
- TTransportException {
+ private synchronized void connect()
+ throws IOException, KeeperException, InterruptedException, TTransportException {
getLeader();
while (true) {
@@ -260,8 +260,8 @@ public class OracleClient implements AutoCloseable {
/**
* Atomically closes current connection and connects to the current leader
*/
- private synchronized void reconnect() throws InterruptedException, TTransportException,
- KeeperException, IOException {
+ private synchronized void reconnect()
+ throws InterruptedException, TTransportException, KeeperException, IOException {
if (transport.isOpen()) {
transport.close();
}
@@ -344,12 +344,10 @@ public class OracleClient implements AutoCloseable {
public OracleClient(Environment env) {
this.env = env;
- responseTimer =
- MetricsUtil.getTimer(env.getConfiguration(), env.getSharedResources().getMetricRegistry(),
- env.getMetricNames().getOracleResponseTime());
- stampsHistogram =
- MetricsUtil.getHistogram(env.getConfiguration(), env.getSharedResources()
- .getMetricRegistry(), env.getMetricNames().getOracleClientStamps());
+ responseTimer = MetricsUtil.getTimer(env.getConfiguration(),
+ env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleResponseTime());
+ stampsHistogram = MetricsUtil.getHistogram(env.getConfiguration(),
+ env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleClientStamps());
timestampRetriever = new TimestampRetriever();
thread = new Thread(timestampRetriever);
thread.setDaemon(true);
@@ -375,8 +373,7 @@ public class OracleClient implements AutoCloseable {
if (waitPeriod < MAX_ORACLE_WAIT_PERIOD) {
waitPeriod *= 2;
}
- log.warn(
- "Waiting for timestamp from Oracle. Is it running? waitTotal={}s waitPeriod={}s",
+ log.warn("Waiting for timestamp from Oracle. Is it running? waitTotal={}s waitPeriod={}s",
waitTotal, waitPeriod);
}
} else if (!tr.cdl.await(timeout, TimeUnit.MILLISECONDS)) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 2cb0bb1..0c7ddf0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -70,8 +70,8 @@ import org.slf4j.LoggerFactory;
* next leader. In the case where an oracle fails over, the next oracle will begin a new block of
* timestamps.
*/
-public class OracleServer extends LeaderSelectorListenerAdapter implements OracleService.Iface,
- PathChildrenCacheListener {
+public class OracleServer extends LeaderSelectorListenerAdapter
+ implements OracleService.Iface, PathChildrenCacheListener {
private static final Logger log = LoggerFactory.getLogger(OracleServer.class);
@@ -132,9 +132,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter implements Oracl
boolean nodeFound = false;
for (String child : children) {
- Long ts =
- LongUtil.fromByteArray(curator.getData().forPath(
- ZookeeperPath.TRANSACTOR_TIMESTAMPS + "/" + child));
+ Long ts = LongUtil.fromByteArray(
+ curator.getData().forPath(ZookeeperPath.TRANSACTOR_TIMESTAMPS + "/" + child));
nodeFound = true;
if (ts < oldestTs) {
oldestTs = ts;
@@ -187,9 +186,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter implements Oracl
public OracleServer(Environment env) throws Exception {
this.env = env;
- stampsHistogram =
- MetricsUtil.getHistogram(env.getConfiguration(), env.getSharedResources()
- .getMetricRegistry(), env.getMetricNames().getOracleServerStamps());
+ stampsHistogram = MetricsUtil.getHistogram(env.getConfiguration(),
+ env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleServerStamps());
this.cnxnListener = new CuratorCnxnListener();
this.maxTsPath = ZookeeperPath.ORACLE_MAX_TIMESTAMP;
this.oraclePath = ZookeeperPath.ORACLE_SERVER;
@@ -206,8 +204,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter implements Oracl
long newMax = Long.parseLong(new String(d)) + 1000;
- curatorFramework.setData().withVersion(stat.getVersion())
- .forPath(maxTsPath, LongUtil.toByteArray(newMax));
+ curatorFramework.setData().withVersion(stat.getVersion()).forPath(maxTsPath,
+ LongUtil.toByteArray(newMax));
maxTs = newMax;
if (!isLeader) {
@@ -440,10 +438,9 @@ public class OracleServer extends LeaderSelectorListenerAdapter implements Oracl
throws Exception {
try {
- if (isConnected()
- && (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)
- || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED) || event
- .getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
+ if (isConnected() && (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)
+ || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)
+ || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
synchronized (this) {
Participant participant = leaderSelector.getLeader();
if (isLeader(participant) && !leaderSelector.hasLeadership()) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java b/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
index cdfabda..df1aa62 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
@@ -111,13 +111,15 @@ public class OracleService {
return result.success;
}
throw new org.apache.thrift.TApplicationException(
- org.apache.thrift.TApplicationException.MISSING_RESULT, "isLeader failed: unknown result");
+ org.apache.thrift.TApplicationException.MISSING_RESULT,
+ "isLeader failed: unknown result");
}
}
- public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements
- AsyncIface {
- public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+ public static class AsyncClient extends org.apache.thrift.async.TAsyncClient
+ implements AsyncIface {
+ public static class Factory
+ implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
private org.apache.thrift.async.TAsyncClientManager clientManager;
private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
@@ -127,7 +129,8 @@ public class OracleService {
this.protocolFactory = protocolFactory;
}
- public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+ public AsyncClient getAsyncClient(
+ org.apache.thrift.transport.TNonblockingTransport transport) {
return new AsyncClient(protocolFactory, clientManager, transport);
}
}
@@ -232,13 +235,11 @@ public class OracleService {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
public Processor(I iface) {
- super(
- iface,
- getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+ super(iface, getProcessMap(
+ new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
- protected Processor(
- I iface,
+ protected Processor(I iface,
Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
@@ -250,8 +251,8 @@ public class OracleService {
return processMap;
}
- public static class getTimestamps<I extends Iface> extends
- org.apache.thrift.ProcessFunction<I, getTimestamps_args> {
+ public static class getTimestamps<I extends Iface>
+ extends org.apache.thrift.ProcessFunction<I, getTimestamps_args> {
public getTimestamps() {
super("getTimestamps");
}
@@ -272,8 +273,8 @@ public class OracleService {
}
}
- public static class isLeader<I extends Iface> extends
- org.apache.thrift.ProcessFunction<I, isLeader_args> {
+ public static class isLeader<I extends Iface>
+ extends org.apache.thrift.ProcessFunction<I, isLeader_args> {
public isLeader() {
super("isLeader");
}
@@ -297,18 +298,16 @@ public class OracleService {
}
- public static class AsyncProcessor<I extends AsyncIface> extends
- org.apache.thrift.TBaseAsyncProcessor<I> {
+ public static class AsyncProcessor<I extends AsyncIface>
+ extends org.apache.thrift.TBaseAsyncProcessor<I> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());
public AsyncProcessor(I iface) {
- super(
- iface,
- getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
+ super(iface, getProcessMap(
+ new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
}
- protected AsyncProcessor(
- I iface,
+ protected AsyncProcessor(I iface,
Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
super(iface, getProcessMap(processMap));
}
@@ -320,8 +319,8 @@ public class OracleService {
return processMap;
}
- public static class getTimestamps<I extends AsyncIface> extends
- org.apache.thrift.AsyncProcessFunction<I, getTimestamps_args, Stamps> {
+ public static class getTimestamps<I extends AsyncIface>
+ extends org.apache.thrift.AsyncProcessFunction<I, getTimestamps_args, Stamps> {
public getTimestamps() {
super("getTimestamps");
}
@@ -330,7 +329,8 @@ public class OracleService {
return new getTimestamps_args();
}
- public AsyncMethodCallback<Stamps> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ public AsyncMethodCallback<Stamps> getResultHandler(final AsyncFrameBuffer fb,
+ final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<Stamps>() {
public void onComplete(Stamps o) {
@@ -351,9 +351,8 @@ public class OracleService {
getTimestamps_result result = new getTimestamps_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
- msg =
- (org.apache.thrift.TBase) new org.apache.thrift.TApplicationException(
- org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ msg = (org.apache.thrift.TBase) new org.apache.thrift.TApplicationException(
+ org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb, msg, msgType, seqid);
@@ -376,8 +375,8 @@ public class OracleService {
}
}
- public static class isLeader<I extends AsyncIface> extends
- org.apache.thrift.AsyncProcessFunction<I, isLeader_args, Boolean> {
+ public static class isLeader<I extends AsyncIface>
+ extends org.apache.thrift.AsyncProcessFunction<I, isLeader_args, Boolean> {
public isLeader() {
super("isLeader");
}
@@ -409,9 +408,8 @@ public class OracleService {
isLeader_result result = new isLeader_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
- msg =
- (org.apache.thrift.TBase) new org.apache.thrift.TApplicationException(
- org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ msg = (org.apache.thrift.TBase) new org.apache.thrift.TApplicationException(
+ org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb, msg, msgType, seqid);
@@ -436,8 +434,8 @@ public class OracleService {
}
- public static class getTimestamps_args implements
- org.apache.thrift.TBase<getTimestamps_args, getTimestamps_args._Fields>,
+ public static class getTimestamps_args
+ implements org.apache.thrift.TBase<getTimestamps_args, getTimestamps_args._Fields>,
java.io.Serializable, Cloneable, Comparable<getTimestamps_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
new org.apache.thrift.protocol.TStruct("getTimestamps_args");
@@ -529,12 +527,13 @@ public class OracleService {
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.ID, new org.apache.thrift.meta_data.FieldMetaData("id",
- org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(
- org.apache.thrift.protocol.TType.STRING)));
- tmpMap
- .put(_Fields.NUM, new org.apache.thrift.meta_data.FieldMetaData("num",
+ tmpMap.put(_Fields.ID,
+ new org.apache.thrift.meta_data.FieldMetaData("id",
+ org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(
+ org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.NUM,
+ new org.apache.thrift.meta_data.FieldMetaData("num",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(
org.apache.thrift.protocol.TType.I32)));
@@ -791,8 +790,8 @@ public class OracleService {
}
}
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException,
- ClassNotFoundException {
+ private void readObject(java.io.ObjectInputStream in)
+ throws java.io.IOException, ClassNotFoundException {
try {
// it doesn't seem like you should have to do this, but java serialization is wacky, and
// doesn't call the default constructor.
@@ -810,8 +809,8 @@ public class OracleService {
}
}
- private static class getTimestamps_argsStandardScheme extends
- StandardScheme<getTimestamps_args> {
+ private static class getTimestamps_argsStandardScheme
+ extends StandardScheme<getTimestamps_args> {
public void read(org.apache.thrift.protocol.TProtocol iprot, getTimestamps_args struct)
throws org.apache.thrift.TException {
@@ -916,8 +915,8 @@ public class OracleService {
}
- public static class getTimestamps_result implements
- org.apache.thrift.TBase<getTimestamps_result, getTimestamps_result._Fields>,
+ public static class getTimestamps_result
+ implements org.apache.thrift.TBase<getTimestamps_result, getTimestamps_result._Fields>,
java.io.Serializable, Cloneable, Comparable<getTimestamps_result> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
new org.apache.thrift.protocol.TStruct("getTimestamps_result");
@@ -1001,10 +1000,11 @@ public class OracleService {
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success",
- org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
- Stamps.class)));
+ tmpMap.put(_Fields.SUCCESS,
+ new org.apache.thrift.meta_data.FieldMetaData("success",
+ org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(
+ org.apache.thrift.protocol.TType.STRUCT, Stamps.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getTimestamps_result.class,
metaDataMap);
@@ -1195,8 +1195,8 @@ public class OracleService {
}
}
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException,
- ClassNotFoundException {
+ private void readObject(java.io.ObjectInputStream in)
+ throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(
new org.apache.thrift.transport.TIOStreamTransport(in)));
@@ -1211,8 +1211,8 @@ public class OracleService {
}
}
- private static class getTimestamps_resultStandardScheme extends
- StandardScheme<getTimestamps_result> {
+ private static class getTimestamps_resultStandardScheme
+ extends StandardScheme<getTimestamps_result> {
public void read(org.apache.thrift.protocol.TProtocol iprot, getTimestamps_result struct)
throws org.apache.thrift.TException {
@@ -1298,9 +1298,9 @@ public class OracleService {
}
- public static class isLeader_args implements
- org.apache.thrift.TBase<isLeader_args, isLeader_args._Fields>, java.io.Serializable,
- Cloneable, Comparable<isLeader_args> {
+ public static class isLeader_args
+ implements org.apache.thrift.TBase<isLeader_args, isLeader_args._Fields>,
+ java.io.Serializable, Cloneable, Comparable<isLeader_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
new org.apache.thrift.protocol.TStruct("isLeader_args");
@@ -1489,8 +1489,8 @@ public class OracleService {
}
}
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException,
- ClassNotFoundException {
+ private void readObject(java.io.ObjectInputStream in)
+ throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(
new org.apache.thrift.transport.TIOStreamTransport(in)));
@@ -1563,9 +1563,9 @@ public class OracleService {
}
- public static class isLeader_result implements
- org.apache.thrift.TBase<isLeader_result, isLeader_result._Fields>, java.io.Serializable,
- Cloneable, Comparable<isLeader_result> {
+ public static class isLeader_result
+ implements org.apache.thrift.TBase<isLeader_result, isLeader_result._Fields>,
+ java.io.Serializable, Cloneable, Comparable<isLeader_result> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
new org.apache.thrift.protocol.TStruct("isLeader_result");
@@ -1838,8 +1838,8 @@ public class OracleService {
}
}
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException,
- ClassNotFoundException {
+ private void readObject(java.io.ObjectInputStream in)
+ throws java.io.IOException, ClassNotFoundException {
try {
// it doesn't seem like you should have to do this, but java serialization is wacky, and
// doesn't call the default constructor.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java b/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
index af7f257..c63c8e6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
@@ -341,7 +341,8 @@ public class Stamps implements org.apache.thrift.TBase<Stamps, Stamps._Fields>,
}
@Override
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot)
+ throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@@ -376,8 +377,8 @@ public class Stamps implements org.apache.thrift.TBase<Stamps, Stamps._Fields>,
}
}
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException,
- ClassNotFoundException {
+ private void readObject(java.io.ObjectInputStream in)
+ throws java.io.IOException, ClassNotFoundException {
try {
// it doesn't seem like you should have to do this, but java serialization is wacky, and
// doesn't call the default constructor.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index 975f46a..1ec1591 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -33,10 +33,9 @@ public class AccumuloUtil {
* Creates Accumulo instance given FluoConfiguration
*/
public static Instance getInstance(FluoConfiguration config) {
- ClientConfiguration clientConfig =
- new ClientConfiguration().withInstance(config.getAccumuloInstance())
- .withZkHosts(config.getAccumuloZookeepers())
- .withZkTimeout(config.getZookeeperTimeout() / 1000);
+ ClientConfiguration clientConfig = new ClientConfiguration()
+ .withInstance(config.getAccumuloInstance()).withZkHosts(config.getAccumuloZookeepers())
+ .withZkTimeout(config.getZookeeperTimeout() / 1000);
return new ZooKeeperInstance(clientConfig);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index be583da..bbc2d03 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -50,9 +50,9 @@ public class ColumnUtil {
return env.getSharedResources().getVisCache().getCV(col);
}
- public static void commitColumn(Environment env, boolean isTrigger, boolean isPrimary,
- Column col, boolean isWrite, boolean isDelete, long startTs, long commitTs,
- Set<Column> observedColumns, Mutation m) {
+ public static void commitColumn(Environment env, boolean isTrigger, boolean isPrimary, Column col,
+ boolean isWrite, boolean isDelete, long startTs, long commitTs, Set<Column> observedColumns,
+ Mutation m) {
if (isWrite) {
Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs,
WriteValue.encode(startTs, isPrimary, isDelete));
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
index 5f5760b..7e68999 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
@@ -77,11 +77,11 @@ public class CuratorUtil {
config.getZookeeperTimeout(), config.getZookeeperSecret());
}
- private static final List<ACL> CREATOR_ALL_ACL = ImmutableList.of(new ACL(Perms.ALL,
- ZooDefs.Ids.AUTH_IDS));
+ private static final List<ACL> CREATOR_ALL_ACL =
+ ImmutableList.of(new ACL(Perms.ALL, ZooDefs.Ids.AUTH_IDS));
- private static final List<ACL> PUBLICLY_READABLE_ACL = ImmutableList.of(new ACL(Perms.READ,
- ZooDefs.Ids.ANYONE_ID_UNSAFE), new ACL(Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+ private static final List<ACL> PUBLICLY_READABLE_ACL = ImmutableList.of(
+ new ACL(Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE), new ACL(Perms.ALL, ZooDefs.Ids.AUTH_IDS));
/**
* Creates a curator built using the given zookeeper connection string and timeout
@@ -125,8 +125,8 @@ public class CuratorUtil {
while (true) {
try {
- curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
- .forPath(zPath, data);
+ curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zPath,
+ data);
return true;
} catch (Exception nee) {
if (nee instanceof KeeperException.NodeExistsException) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/FluoExecutors.java b/modules/core/src/main/java/org/apache/fluo/core/util/FluoExecutors.java
index a126314..bb1f55b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/FluoExecutors.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/FluoExecutors.java
@@ -28,26 +28,25 @@ public class FluoExecutors {
return newFixedThreadPool(numThreads, new LinkedBlockingQueue<Runnable>(), name);
}
- public static ThreadPoolExecutor newFixedThreadPool(int numThreads,
- BlockingQueue<Runnable> queue, String name) {
- ThreadPoolExecutor tpe =
- new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, queue,
- new FluoThreadFactory(name)) {
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- if (t != null) {
- if (t instanceof Exception) {
- LoggerFactory.getLogger(FluoExecutors.class).warn(
- "Thread pool saw uncaught Exception", t);
- } else {
- // this is likely an Error. Things may be in a really bad state, so just print it
- // instead of logging.
- System.err.println("Threadpool saw uncaught Throwable");
- t.printStackTrace();
- }
- }
+ public static ThreadPoolExecutor newFixedThreadPool(int numThreads, BlockingQueue<Runnable> queue,
+ String name) {
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(numThreads, numThreads, 0L,
+ TimeUnit.MILLISECONDS, queue, new FluoThreadFactory(name)) {
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (t != null) {
+ if (t instanceof Exception) {
+ LoggerFactory.getLogger(FluoExecutors.class).warn("Thread pool saw uncaught Exception",
+ t);
+ } else {
+ // this is likely an Error. Things may be in a really bad state, so just print it
+ // instead of logging.
+ System.err.println("Threadpool saw uncaught Throwable");
+ t.printStackTrace();
}
- };
+ }
+ }
+ };
return tpe;
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/FluoThreadFactory.java b/modules/core/src/main/java/org/apache/fluo/core/util/FluoThreadFactory.java
index 194db88..4b2f57b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/FluoThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/FluoThreadFactory.java
@@ -18,7 +18,6 @@ package org.apache.fluo.core.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-
public class FluoThreadFactory implements ThreadFactory {
private static AtomicInteger poolCount = new AtomicInteger();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index a9f2e22..d545195 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -40,16 +40,14 @@ public class ScanUtil {
public static Span getSpan(ScanOpts options) {
Span span = new Span();
- if ((options.getExactRow() != null)
- && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
- .getRowPrefix() != null))) {
+ if ((options.getExactRow() != null) && ((options.getStartRow() != null)
+ || (options.getEndRow() != null) || (options.getRowPrefix() != null))) {
throw new IllegalArgumentException(
"You cannot specify an exact row with a start/end row or row prefix!");
}
- if ((options.getRowPrefix() != null)
- && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
- .getExactRow() != null))) {
+ if ((options.getRowPrefix() != null) && ((options.getStartRow() != null)
+ || (options.getEndRow() != null) || (options.getExactRow() != null))) {
throw new IllegalArgumentException(
"You cannot specify an prefix row with a start/end row or exact row!");
}
@@ -83,8 +81,8 @@ public class ScanUtil {
} else if (colFields.length == 2) {
columns.add(new Column(colFields[0], colFields[1]));
} else {
- throw new IllegalArgumentException("Failed to scan! Column '" + column
- + "' has too many fields (indicated by ':')");
+ throw new IllegalArgumentException(
+ "Failed to scan! Column '" + column + "' has too many fields (indicated by ':')");
}
}
@@ -92,8 +90,8 @@ public class ScanUtil {
}
public static void scanFluo(ScanOpts options, FluoConfiguration sConfig) {
- System.out.println("Scanning snapshot of data in Fluo '" + sConfig.getApplicationName()
- + "' application.");
+ System.out.println(
+ "Scanning snapshot of data in Fluo '" + sConfig.getApplicationName() + "' application.");
long entriesFound = 0;
try (FluoClient client = FluoFactory.newClient(sConfig)) {
@@ -166,8 +164,8 @@ public class ScanUtil {
scanner.setRange(SpanUtil.toRange(span));
for (Column col : columns) {
if (col.isQualifierSet()) {
- scanner
- .fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
+ scanner.fetchColumn(ByteUtil.toText(col.getFamily()),
+ ByteUtil.toText(col.getQualifier()));
} else {
scanner.fetchColumnFamily(ByteUtil.toText(col.getFamily()));
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
index 3938c07..5c58e11 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
@@ -21,9 +21,8 @@ import org.apache.fluo.core.worker.finder.hash.PartitionNotificationFinder;
public class NotificationFinderFactory {
public static NotificationFinder newNotificationFinder(FluoConfiguration conf) {
- String clazz =
- conf.getString(FluoConfigurationImpl.WORKER_FINDER_PROP,
- PartitionNotificationFinder.class.getName());
+ String clazz = conf.getString(FluoConfigurationImpl.WORKER_FINDER_PROP,
+ PartitionNotificationFinder.class.getName());
try {
return Class.forName(clazz).asSubclass(NotificationFinder.class).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index c821684..46159b8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -194,8 +194,8 @@ public class NotificationProcessor implements AutoCloseable {
}
- private class FutureNotificationTask extends FutureTask<Void> implements
- Comparable<FutureNotificationTask> {
+ private class FutureNotificationTask extends FutureTask<Void>
+ implements Comparable<FutureNotificationTask> {
private final Notification notification;
@@ -227,9 +227,8 @@ public class NotificationProcessor implements AutoCloseable {
public boolean addNotification(final NotificationFinder notificationFinder,
final Notification notification) {
- WorkTaskAsync workTask =
- new WorkTaskAsync(NotificationProcessor.this, notificationFinder, env, notification,
- observers);
+ WorkTaskAsync workTask = new WorkTaskAsync(NotificationProcessor.this, notificationFinder,
+ env, notification, observers);
FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);
if (!tracker.add(notification.getRowColumn(), ft)) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
index 90524c4..4474c78 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
@@ -214,9 +214,8 @@ public class PartitionManager {
private synchronized void scheduleRetry() {
schedExecutor.schedule(this::updatePartitionInfo, retrySleepTime, TimeUnit.MILLISECONDS);
- retrySleepTime =
- Math.min(maxSleepTime,
- (long) (1.5 * retrySleepTime) + (long) (retrySleepTime * Math.random()));
+ retrySleepTime = Math.min(maxSleepTime,
+ (long) (1.5 * retrySleepTime) + (long) (retrySleepTime * Math.random()));
}
private synchronized void scheduleUpdate() {
@@ -236,10 +235,9 @@ public class PartitionManager {
me = ZKPaths.getNodeFromPath(me);
String me2 = me;
- boolean imFirst =
- childrenCache.getCurrentData().stream().map(ChildData::getPath)
- .map(ZKPaths::getNodeFromPath).sorted().findFirst().map(s -> s.equals(me2))
- .orElse(false);
+ boolean imFirst = childrenCache.getCurrentData().stream().map(ChildData::getPath)
+ .map(ZKPaths::getNodeFromPath).sorted().findFirst().map(s -> s.equals(me2))
+ .orElse(false);
if (imFirst) {
@@ -278,13 +276,11 @@ public class PartitionManager {
this.maxSleepTime = maxSleepTime;
this.retrySleepTime = minSleepTime;
- groupSize =
- env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE,
- FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT);
+ groupSize = env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE,
+ FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT);
- myESNode =
- new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
- + "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8));
+ myESNode = new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL,
+ ZookeeperPath.FINDERS + "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8));
myESNode.start();
myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);
@@ -292,9 +288,8 @@ public class PartitionManager {
childrenCache.getListenable().addListener(new FindersListener());
childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
- schedExecutor =
- Executors.newScheduledThreadPool(1,
- new FluoThreadFactory("Fluo worker partition manager"));
+ schedExecutor = Executors.newScheduledThreadPool(1,
+ new FluoThreadFactory("Fluo worker partition manager"));
schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime,
TimeUnit.MILLISECONDS);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
index c3649aa..dbd4339 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
@@ -51,9 +51,8 @@ public class PartitionNotificationFinder implements NotificationFinder {
partitionManager = new PartitionManager(env, minSleepTime, maxSleepTime);
- scanThread =
- new Thread(new ScanTask(this, processor, partitionManager, env, stopped, minSleepTime,
- maxSleepTime));
+ scanThread = new Thread(
+ new ScanTask(this, processor, partitionManager, env, stopped, minSleepTime, maxSleepTime));
scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
scanThread.setDaemon(true);
scanThread.start();
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
index 53ad7ba..fc44a64 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
@@ -108,9 +108,8 @@ public class SerializedSplits {
public static byte[] serializeTableSplits(Environment env) {
List<Bytes> splits;
try {
- splits =
- env.getConnector().tableOperations().listSplits(env.getTable()).stream()
- .map(ByteUtil::toBytes).collect(Collectors.toList());
+ splits = env.getConnector().tableOperations().listSplits(env.getTable()).stream()
+ .map(ByteUtil::toBytes).collect(Collectors.toList());
} catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
index 30a97f4..a7517f8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -80,8 +80,8 @@ public class TableRange implements Comparable<TableRange> {
tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), sortedRows.get(i)));
}
- tablets.add(new TableRange(sortedRows.size() == 0 ? null
- : sortedRows.get(sortedRows.size() - 1), null));
+ tablets.add(new TableRange(
+ sortedRows.size() == 0 ? null : sortedRows.get(sortedRows.size() - 1), null));
return tablets;
}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/util/ByteUtilTest.java b/modules/core/src/test/java/org/apache/fluo/core/util/ByteUtilTest.java
index 0b17961..35c4a88 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/util/ByteUtilTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/util/ByteUtilTest.java
@@ -54,10 +54,9 @@ public class ByteUtilTest {
Bytes b1 = Bytes.of("str1");
Bytes b2 = Bytes.of("string2");
Bytes b3 = Bytes.of("s3");
- Bytes b4 =
- Bytes.of("testinggreaterthan128characterstestinggreaterthan128characters"
- + "testinggreaterthan128characterstestinggreaterthan128characters"
- + "testinggreaterthan128characters"); // 155 length
+ Bytes b4 = Bytes.of("testinggreaterthan128characterstestinggreaterthan128characters"
+ + "testinggreaterthan128characterstestinggreaterthan128characters"
+ + "testinggreaterthan128characters"); // 155 length
byte[] ball = ByteArrayUtil.concat(b1, b2, b3, b4);
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
index b3d0d64..3bf5307 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -41,17 +41,15 @@ public class PartitionManagerTest {
for (int numWorkers : new int[] {1, 5, 10, 11, 100}) {
for (int groupSize : new int[] {1, 2, 3, 5, 7, 13, 17, 19, 43, 73, 97}) {
int expectedGroups = Math.max(1, numWorkers / groupSize);
- int maxGroupSize =
- Math.min(numWorkers,
- groupSize + (int) Math.ceil((numWorkers % groupSize) / (double) expectedGroups));
+ int maxGroupSize = Math.min(numWorkers,
+ groupSize + (int) Math.ceil((numWorkers % groupSize) / (double) expectedGroups));
TreeSet<String> children = new TreeSet<>();
IntStream.range(0, numWorkers).mapToObj(nff).forEach(children::add);
- Collection<Bytes> rows =
- IntStream.iterate(0, i -> i + 1000).limit(numSplits)
- .mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList());
+ Collection<Bytes> rows = IntStream.iterate(0, i -> i + 1000).limit(numSplits)
+ .mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList());
Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
Set<String> idCombos = new HashSet<>();
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
index 13aa941..79f1ec5 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
@@ -31,9 +31,8 @@ import static java.util.stream.Collectors.toSet;
public class SerializedSplitsTest {
@Test
public void testLotsOfSplits() {
- List<Bytes> splits =
- IntStream.iterate(0, i -> i + 13).limit(1_000_000).mapToObj(i -> String.format("%08x", i))
- .map(Bytes::of).collect(toList());
+ List<Bytes> splits = IntStream.iterate(0, i -> i + 13).limit(1_000_000)
+ .mapToObj(i -> String.format("%08x", i)).map(Bytes::of).collect(toList());
byte[] data = SerializedSplits.serialize(splits);
Assert.assertTrue(data.length <= SerializedSplits.MAX_SIZE);
@@ -44,9 +43,8 @@ public class SerializedSplitsTest {
Assert.assertTrue(splits2.size() > 10_000);
Assert.assertTrue(splits2.size() < splits.size());
- int expectedDiff =
- Integer.parseInt(splits2.get(1).toString(), 16)
- - Integer.parseInt(splits2.get(0).toString(), 16);
+ int expectedDiff = Integer.parseInt(splits2.get(1).toString(), 16)
+ - Integer.parseInt(splits2.get(0).toString(), 16);
Assert.assertTrue(expectedDiff > 13);
Assert.assertTrue(expectedDiff % 13 == 0);
// check that splits are evenly spaced
@@ -65,9 +63,8 @@ public class SerializedSplitsTest {
@Test
public void testSimple() {
- Set<Bytes> splits =
- IntStream.iterate(0, i -> i + 13).limit(1_000).mapToObj(i -> String.format("%08x", i))
- .map(Bytes::of).collect(toSet());
+ Set<Bytes> splits = IntStream.iterate(0, i -> i + 13).limit(1_000)
+ .mapToObj(i -> String.format("%08x", i)).map(Bytes::of).collect(toSet());
byte[] data = SerializedSplits.serialize(splits);
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
index c73044a..637186f 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -117,8 +117,9 @@ public class TableRangeTest {
public void testToRange() {
for (String prev : new String[] {null, "foo"}) {
for (String end : new String[] {null, "zoo"}) {
- Assert.assertEquals(new Range(prev, false, end, true), new TableRange(prev == null ? null
- : Bytes.of(prev), end == null ? null : Bytes.of(end)).getRange());
+ Assert.assertEquals(new Range(prev, false, end, true),
+ new TableRange(prev == null ? null : Bytes.of(prev), end == null ? null : Bytes.of(end))
+ .getRange());
}
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java b/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
index 37e7df2..ab33fb2 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
@@ -27,7 +27,8 @@ public class BankUtil {
private BankUtil() {}
- public static void transfer(Environment env, String from, String to, int amount) throws Exception {
+ public static void transfer(Environment env, String from, String to, int amount)
+ throws Exception {
TestTransaction tx = new TestTransaction(env);
int bal1 = Integer.parseInt(tx.gets(from, BALANCE));
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 8f644eb..b16c790 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -41,10 +41,10 @@ public class ITBase {
protected final static String USER = "root";
protected final static String PASSWORD = "ITSecret";
protected final static String TABLE_BASE = "table";
- protected final static String IT_INSTANCE_NAME_PROP = FluoConfiguration.FLUO_PREFIX
- + ".it.instance.name";
- protected final static String IT_INSTANCE_CLEAR_PROP = FluoConfiguration.FLUO_PREFIX
- + ".it.instance.clear";
+ protected final static String IT_INSTANCE_NAME_PROP =
+ FluoConfiguration.FLUO_PREFIX + ".it.instance.name";
+ protected final static String IT_INSTANCE_CLEAR_PROP =
+ FluoConfiguration.FLUO_PREFIX + ".it.instance.clear";
protected static String instanceName;
protected static Connector conn;
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index ac2a774..57b3923 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -136,13 +136,13 @@ public class TestTransaction extends AbstractTransactionBase implements Transact
return tx.preCommit(cd, primary);
}
- public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) throws AccumuloException,
- AccumuloSecurityException {
+ public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp)
+ throws AccumuloException, AccumuloSecurityException {
return tx.commitPrimaryColumn(cd, commitStamp);
}
- public void finishCommit(CommitData cd, Stamp commitStamp) throws MutationsRejectedException,
- TableNotFoundException {
+ public void finishCommit(CommitData cd, Stamp commitStamp)
+ throws MutationsRejectedException, TableNotFoundException {
tx.finishCommit(cd, commitStamp);
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 58eface..5c7bd6b 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -106,7 +106,8 @@ public class FluoAdminImplIT extends ITBaseImpl {
InitializationOptions opts =
new InitializationOptions().setClearZookeeper(true).setClearTable(true);
- for (String host : new String[] {"localhost", "localhost/", "localhost:9999", "localhost:9999/"}) {
+ for (String host : new String[] {"localhost", "localhost/", "localhost:9999",
+ "localhost:9999/"}) {
config.setInstanceZookeepers(host);
try (FluoAdmin fluoAdmin = new FluoAdminImpl(config)) {
fluoAdmin.initialize(opts);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 7fc39e3..6e9abe4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -163,13 +163,12 @@ public class CollisionIT extends ITBaseMini {
for (Entry<Key, Value> entry : scanner) {
Key k = entry.getKey();
- String rowCol =
- k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":"
- + String.format("%x", k.getTimestamp() & ColumnConstants.PREFIX_MASK);
+ String rowCol = k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":"
+ + String.format("%x", k.getTimestamp() & ColumnConstants.PREFIX_MASK);
if (rowCols.contains(rowCol)) {
System.err.println("DEBUG oldestTs : " + oldestTS + " recentTS : " + recentTS);
- Iterables.transform(scanner, e -> "DEBUG " + FluoFormatter.toString(e)).forEach(
- System.err::println);
+ Iterables.transform(scanner, e -> "DEBUG " + FluoFormatter.toString(e))
+ .forEach(System.err::println);
}
Assert.assertFalse("Duplicate row col " + rowCol, rowCols.contains(rowCol));
rowCols.add(rowCol);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
index 4da6b08..60f43ca 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
@@ -43,7 +43,8 @@ public class FaultyConfig extends Environment {
private Random rand;
private double wp;
- FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
+ FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability,
+ double writeProbability) {
this.cw = cw;
this.up = unknownProbability;
this.wp = writeProbability;
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 0265d0e..b1352ee 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -512,9 +512,8 @@ public class FluoIT extends ITBaseImpl {
Map<String, Map<Column, String>> map1 =
tx2.gets(Arrays.asList("d:0001", "d:0002"), Collections.singleton(ccol));
- Map<String, ImmutableMap<Column, String>> expected1 =
- ImmutableMap.of("d:0001", ImmutableMap.of(ccol, "abc def"), "d:0002",
- ImmutableMap.of(ccol, "neb feg"));
+ Map<String, ImmutableMap<Column, String>> expected1 = ImmutableMap.of("d:0001",
+ ImmutableMap.of(ccol, "abc def"), "d:0002", ImmutableMap.of(ccol, "neb feg"));
Assert.assertEquals(expected1, map1);
Assert.assertEquals("45", tx2.gets("d:0001", tcol));
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index 5c9c02f..0ab398a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -52,9 +52,8 @@ public class ObserverConfigIT extends ITBaseMini {
SimpleConfiguration myConfig = context.getObserverConfiguration();
String ocTokens[] = myConfig.getString("observedCol").split(":");
- observedColumn =
- new ObservedColumn(new Column(ocTokens[0], ocTokens[1]),
- NotificationType.valueOf(ocTokens[2]));
+ observedColumn = new ObservedColumn(new Column(ocTokens[0], ocTokens[1]),
+ NotificationType.valueOf(ocTokens[2]));
outputCQ = Bytes.of(myConfig.getString("outputCQ"));
String swn = myConfig.getString("setWeakNotification", "false");
if (swn.equals("true")) {
@@ -100,15 +99,15 @@ public class ObserverConfigIT extends ITBaseMini {
protected void setupObservers(FluoConfiguration fc) {
List<ObserverSpecification> observers = new ArrayList<>();
- observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
- "observedCol", "fam1:col1:" + NotificationType.STRONG, "outputCQ", "col2")));
+ observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col1:" + NotificationType.STRONG, "outputCQ", "col2")));
- observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
- "observedCol", "fam1:col2:" + NotificationType.STRONG, "outputCQ", "col3",
- "setWeakNotification", "true")));
+ observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col2:" + NotificationType.STRONG, "outputCQ", "col3",
+ "setWeakNotification", "true")));
- observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
- "observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4")));
+ observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(),
+ newMap("observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4")));
fc.addObservers(observers);
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
index 6d62177..e8af4c7 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java
@@ -45,10 +45,8 @@ public class ScannerIT extends ITBaseImpl {
Assert.assertEquals(2, expectedR2.size());
HashSet<RowColumnValue> expectedR2c = new HashSet<>();
- Iterables.addAll(
- expectedR2c,
- Iterables.filter(expected,
- rcv -> rcv.getsRow().equals("r2") && rcv.getColumn().equals(new Column("f1", "q2"))));
+ Iterables.addAll(expectedR2c, Iterables.filter(expected,
+ rcv -> rcv.getsRow().equals("r2") && rcv.getColumn().equals(new Column("f1", "q2"))));
Assert.assertEquals(1, expectedR2c.size());
HashSet<RowColumnValue> expectedC = new HashSet<>();
@@ -62,8 +60,7 @@ public class ScannerIT extends ITBaseImpl {
Assert.assertEquals(2, expectedCF.size());
HashSet<RowColumnValue> expectedCols = new HashSet<>();
- Iterables.addAll(
- expectedCols,
+ Iterables.addAll(expectedCols,
Iterables.filter(expected, rcv -> rcv.getColumn().equals(new Column("f2", "q5"))
|| rcv.getColumn().equals(new Column("f1", "q1"))));
Assert.assertEquals(3, expectedCols.size());
@@ -74,8 +71,8 @@ public class ScannerIT extends ITBaseImpl {
Assert.assertEquals(expectedR2, actual);
actual.clear();
- Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).fetch(new Column("f1", "q2"))
- .build());
+ Iterables.addAll(actual,
+ snap.scanner().over(Span.exact("r2")).fetch(new Column("f1", "q2")).build());
Assert.assertEquals(expectedR2c, actual);
actual.clear();
@@ -87,8 +84,8 @@ public class ScannerIT extends ITBaseImpl {
Assert.assertEquals(expectedCF, actual);
actual.clear();
- Iterables.addAll(actual, snap.scanner().fetch(new Column("f2", "q5"), new Column("f1", "q1"))
- .build());
+ Iterables.addAll(actual,
+ snap.scanner().fetch(new Column("f2", "q5"), new Column("f1", "q1")).build());
Assert.assertEquals(expectedCols, actual);
}
@@ -102,10 +99,8 @@ public class ScannerIT extends ITBaseImpl {
Column col2 = new Column("f2", "q3");
HashSet<RowColumnValue> expectedC = new HashSet<>();
- Iterables.addAll(
- expectedC,
- Iterables.filter(expected,
- rcv -> rcv.getColumn().equals(col1) || rcv.getColumn().equals(col2)));
+ Iterables.addAll(expectedC, Iterables.filter(expected,
+ rcv -> rcv.getColumn().equals(col1) || rcv.getColumn().equals(col2)));
Assert.assertEquals(3, expectedC.size());
try (Snapshot snap = client.newSnapshot()) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
index 37df6fa..8afa320 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
@@ -201,8 +201,8 @@ public class StochasticBankIT extends ITBaseImpl {
long t2 = System.currentTimeMillis();
log.debug("avg : %,9.2f min : %,6d max : %,6d stddev : %1.2f rate : %,6.2f\n",
- stat.getAverage(), stat.getMin(), stat.getMax(), stat.getStdDev(), numAccounts
- / ((t2 - t1) / 1000.0));
+ stat.getAverage(), stat.getMin(), stat.getMax(), stat.getStdDev(),
+ numAccounts / ((t2 - t1) / 1000.0));
if (stat.getSum() != numAccounts * 1000) {
if (lastTx != null) {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index 820f48c..018771f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -52,7 +52,8 @@ public class StrongNotificationIT extends ITBaseMini {
@Test
public void testRollforward() throws Exception {
// test for bug #642
- try (Environment env = new Environment(config); TransactorNode tnode = new TransactorNode(env)) {
+ try (Environment env = new Environment(config);
+ TransactorNode tnode = new TransactorNode(env)) {
TestTransaction tx = new TestTransaction(env, tnode);
// set three columns that should each trigger observers
@@ -63,8 +64,8 @@ public class StrongNotificationIT extends ITBaseMini {
// partially commit transaction
CommitData cd = tx.createCommitData();
Assert.assertTrue(tx.preCommit(cd));
- Assert.assertTrue(tx.commitPrimaryColumn(cd, env.getSharedResources().getOracleClient()
- .getStamp()));
+ Assert.assertTrue(
+ tx.commitPrimaryColumn(cd, env.getSharedResources().getOracleClient().getStamp()));
tx.close();
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TimestampTrackerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TimestampTrackerIT.java
index 9ea0303..3712813 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TimestampTrackerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TimestampTrackerIT.java
@@ -128,9 +128,8 @@ public class TimestampTrackerIT extends ITBaseImpl {
long oldestTs = Long.MAX_VALUE;
for (String child : children) {
- Long ts =
- LongUtil.fromByteArray(curator.getData().forPath(
- ZookeeperPath.TRANSACTOR_TIMESTAMPS + "/" + child));
+ Long ts = LongUtil.fromByteArray(
+ curator.getData().forPath(ZookeeperPath.TRANSACTOR_TIMESTAMPS + "/" + child));
if (ts < oldestTs) {
oldestTs = ts;
}
@@ -140,14 +139,15 @@ public class TimestampTrackerIT extends ITBaseImpl {
}
private boolean zkNodeExists(TimestampTracker tracker) throws Exception {
- return env.getSharedResources().getCurator().checkExists().forPath(tracker.getNodePath()) != null;
+ return env.getSharedResources().getCurator().checkExists()
+ .forPath(tracker.getNodePath()) != null;
}
private long zkNodeValue(TimestampTracker tracker) throws Exception {
if (zkNodeExists(tracker) == false) {
throw new IllegalStateException("node does not exist");
}
- return LongUtil.fromByteArray(env.getSharedResources().getCurator().getData()
- .forPath(tracker.getNodePath()));
+ return LongUtil.fromByteArray(
+ env.getSharedResources().getCurator().getData().forPath(tracker.getNodePath()));
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactorIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactorIT.java
index 346deec..00e05a9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactorIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactorIT.java
@@ -54,10 +54,10 @@ public class TransactorIT extends ITBaseImpl {
Assert.assertEquals(id2, t2.getTransactorID().getLongID());
Assert.assertTrue(checkExists(t1));
Assert.assertTrue(checkExists(t2));
- Assert.assertArrayEquals("2".getBytes(), env.getSharedResources().getCurator().getData()
- .forPath(t1.getNodePath()));
- Assert.assertArrayEquals("3".getBytes(), env.getSharedResources().getCurator().getData()
- .forPath(t2.getNodePath()));
+ Assert.assertArrayEquals("2".getBytes(),
+ env.getSharedResources().getCurator().getData().forPath(t1.getNodePath()));
+ Assert.assertArrayEquals("3".getBytes(),
+ env.getSharedResources().getCurator().getData().forPath(t2.getNodePath()));
// verify the cache
Assert.assertTrue(cache.checkExists(id1));
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index cd2584a..c1ea66f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -151,8 +151,8 @@ public class WorkerIT extends ITBaseMini {
Assert.fail();
} catch (IllegalArgumentException ise) {
- Assert.assertTrue(ise.getMessage().contains(
- "Column attr2 lastupdate not previously configured for strong notifications"));
+ Assert.assertTrue(ise.getMessage()
+ .contains("Column attr2 lastupdate not previously configured for strong notifications"));
} finally {
observedColumn = LAST_UPDATE;
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
index 0cc4ac1..4350f4b 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ZKSecretIT.java
@@ -15,7 +15,6 @@
package org.apache.fluo.integration.impl;
-
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -122,9 +121,8 @@ public class ZKSecretIT extends ITBaseMini {
// Verify oracle gc timestamp is visible w/o a password. The GC iterator that runs in Accumulo
// tablet servers reads this.
- String ts =
- new String(zk.getData(ZookeeperPath.ORACLE_GC_TIMESTAMP, false, null),
- StandardCharsets.UTF_8);
+ String ts = new String(zk.getData(ZookeeperPath.ORACLE_GC_TIMESTAMP, false, null),
+ StandardCharsets.UTF_8);
Assert.assertTrue(ts.matches("\\d+"));
// the timestamp should be read only... trying to modify it should fail
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 2f49b4f..a4749f3 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -219,21 +219,16 @@ public class LogIT extends ITBaseMini {
String logMsgs = writer.toString();
logMsgs = logMsgs.replace('\n', ' ');
- Assert
- .assertTrue(logMsgs
- .matches(".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 0 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TriggerLoader.*"));
- Assert
- .assertTrue(logMsgs
- .matches(".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: SimpleLoader.*"));
- Assert
- .assertTrue(logMsgs
- .matches(".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: SimpleLoader.*"));
- Assert
- .assertTrue(logMsgs
- .matches(".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TestObserver.*"));
- Assert
- .assertTrue(logMsgs
- .matches(".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: TestObserver.*"));
+ Assert.assertTrue(logMsgs.matches(
+ ".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 0 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TriggerLoader.*"));
+ Assert.assertTrue(logMsgs.matches(
+ ".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: SimpleLoader.*"));
+ Assert.assertTrue(logMsgs.matches(
+ ".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: SimpleLoader.*"));
+ Assert.assertTrue(logMsgs.matches(
+ ".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 0 waitTime: \\d+ committed: true class: TestObserver.*"));
+ Assert.assertTrue(logMsgs.matches(
+ ".*txid: \\d+ thread : \\d+ time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 1 waitTime: \\d+ committed: false class: TestObserver.*"));
}
@Test
@@ -352,10 +347,9 @@ public class LogIT extends ITBaseMini {
ImmutableMap.of(new RowColumn("r1", c1), "v1", new RowColumn("r2", c2), "v4"), ret1);
Map<String, Map<Column, String>> ret2 =
snap.gets(Arrays.asList("r1", "r2"), ImmutableSet.of(c1));
- Assert
- .assertEquals(
- ImmutableMap.of("r1", ImmutableMap.of(c1, "v1"), "r2", ImmutableMap.of(c1, "v3")),
- ret2);
+ Assert.assertEquals(
+ ImmutableMap.of("r1", ImmutableMap.of(c1, "v1"), "r2", ImmutableMap.of(c1, "v3")),
+ ret2);
Map<Column, String> ret3 = snap.gets("r1", ImmutableSet.of(c1, c2));
Assert.assertEquals(ImmutableMap.of(c1, "v1", c2, "v2"), ret3);
Assert.assertEquals("v1", snap.gets("r1", c1));
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index d8ea54b..56418ce 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -91,12 +91,11 @@ public class FluoEntryInputFormat extends InputFormat<RowColumn, Bytes> {
}
@Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
- InterruptedException {
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
try {
- ByteArrayInputStream bais =
- new ByteArrayInputStream(context.getConfiguration().get(PROPS_CONF_KEY)
- .getBytes(StandardCharsets.UTF_8));
+ ByteArrayInputStream bais = new ByteArrayInputStream(
+ context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
env = new Environment(new FluoConfiguration(bais));
@@ -158,8 +157,8 @@ public class FluoEntryInputFormat extends InputFormat<RowColumn, Bytes> {
AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
fconfig.getAccumuloZookeepers());
- AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(), new PasswordToken(
- fconfig.getAccumuloPassword()));
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
}
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
index b282e08..0e1d3a0 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
@@ -48,8 +48,8 @@ public class FluoOutputFormat extends OutputFormat<Loader, NullWritable> {
}
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException,
- InterruptedException {
+ public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
+ throws IOException, InterruptedException {
return new OutputCommitter() {
@Override
@@ -77,9 +77,8 @@ public class FluoOutputFormat extends OutputFormat<Loader, NullWritable> {
public RecordWriter<Loader, NullWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- ByteArrayInputStream bais =
- new ByteArrayInputStream(context.getConfiguration().get(PROPS_CONF_KEY)
- .getBytes(StandardCharsets.UTF_8));
+ ByteArrayInputStream bais = new ByteArrayInputStream(
+ context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
FluoConfiguration config = new FluoConfiguration(bais);
@@ -99,8 +98,8 @@ public class FluoOutputFormat extends OutputFormat<Loader, NullWritable> {
}
@Override
- public void write(Loader loader, NullWritable nullw) throws IOException,
- InterruptedException {
+ public void write(Loader loader, NullWritable nullw)
+ throws IOException, InterruptedException {
lexecutor.execute(loader);
}
};
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 9dc5e72..37d7c58 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -91,12 +91,11 @@ public class FluoRowInputFormat extends InputFormat<Bytes, Iterator<ColumnValue>
}
@Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
- InterruptedException {
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
try {
- ByteArrayInputStream bais =
- new ByteArrayInputStream(context.getConfiguration().get(PROPS_CONF_KEY)
- .getBytes(StandardCharsets.UTF_8));
+ ByteArrayInputStream bais = new ByteArrayInputStream(
+ context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
env = new Environment(new FluoConfiguration(bais));
@@ -159,8 +158,8 @@ public class FluoRowInputFormat extends InputFormat<Bytes, Iterator<ColumnValue>
AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
fconfig.getAccumuloZookeepers());
- AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(), new PasswordToken(
- fconfig.getAccumuloPassword()));
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
}
diff --git a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index d96dc1a..05b163e 100644
--- a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++ b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -48,8 +48,8 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
private FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
@Override
- public void map(LongWritable key, Text data, Context context) throws IOException,
- InterruptedException {
+ public void map(LongWritable key, Text data, Context context)
+ throws IOException, InterruptedException {
String fields[] = data.toString().split(",");
fkvg.setRow(fields[0]).setColumn(new Column(fields[1], fields[2])).setValue(fields[3]);
@@ -61,8 +61,8 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
}
@Rule
- public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir")
- + "/target"));
+ public TemporaryFolder tempFolder =
+ new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
@Test
public void testImportFile() throws Exception {
--
To stop receiving notification emails like this one, please contact
"commits@fluo.apache.org" <co...@fluo.apache.org>.