You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ja...@apache.org on 2018/10/15 14:46:55 UTC
incubator-omid git commit: OMID-117 Ensure timeouts are configured
low for RPCs to commit table
Repository: incubator-omid
Updated Branches:
refs/heads/phoenix-integration c725f1be5 -> a40b6f8ea
OMID-117 Ensure timeouts are configured low for RPCs to commit table
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/a40b6f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/a40b6f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/a40b6f8e
Branch: refs/heads/phoenix-integration
Commit: a40b6f8eab04de2808b9d177fb7e87e36530a354
Parents: c725f1b
Author: James Taylor <ja...@apache.org>
Authored: Thu Oct 11 08:50:26 2018 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Sun Oct 14 09:22:38 2018 -0700
----------------------------------------------------------------------
.../transaction/HBaseTransactionManager.java | 3 +-
.../apache/omid/transaction/OmidTestBase.java | 1 -
.../committable/hbase/HBaseCommitTable.java | 12 +-
.../committable/hbase/TestHBaseCommitTable.java | 6 +-
.../regionserver/RegionConnectionFactory.java | 153 +++++++++++++++++++
.../apache/omid/transaction/OmidCompactor.java | 13 +-
.../omid/transaction/OmidSnapshotFilter.java | 24 +--
.../InterRegionServerRpcController.java | 50 ++++++
.../InterRegionServerRpcControllerFactory.java | 63 ++++++++
.../main/java/org/apache/omid/HBaseShims.java | 21 ++-
.../InterRegionServerRpcController.java | 51 +++++++
.../InterRegionServerRpcControllerFactory.java | 63 ++++++++
.../main/java/org/apache/omid/HBaseShims.java | 20 ++-
pom.xml | 2 -
.../org/apache/omid/tso/client/TSOClient.java | 2 -
.../org/apache/omid/tso/client/TSOProtocol.java | 2 -
.../org/apache/omid/tso/MonitoringContext.java | 1 -
.../apache/omid/tso/RequestProcessorImpl.java | 1 -
.../org/apache/omid/tso/TestRetryProcessor.java | 1 -
.../apache/omid/tso/TestTimestampOracle.java | 1 -
.../apache/omid/tso/TestWorldTimeOracle.java | 2 -
21 files changed, 453 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index 12323c3..85c785e 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@@ -121,7 +122,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
- CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+ CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
return Optional.of(commitTable.getClient());
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index 3bbcc6e..d0907f3 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.omid.HBaseShims;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.InMemoryCommitTable;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index eb0ba04..447bc37 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -74,12 +74,20 @@ public class HBaseCommitTable implements CommitTable {
*/
@Inject
public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) throws IOException {
- this(hbaseConfig, config, KeyGeneratorImplementations.defaultKeyGenerator());
+ this(ConnectionFactory.createConnection(hbaseConfig), config, KeyGeneratorImplementations.defaultKeyGenerator());
+ }
+
+ public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config) throws IOException {
+ this(hbaseConnection, config, KeyGeneratorImplementations.defaultKeyGenerator());
}
public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
+ this(ConnectionFactory.createConnection(hbaseConfig), config, keygen);
+ }
+
+ public HBaseCommitTable(Connection hbaseConnection, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
- this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);
+ this.hbaseConnection = hbaseConnection;
this.tableName = config.getTableName();
this.commitTableFamily = config.getCommitTableFamily();
this.lowWatermarkFamily = config.getLowWatermarkFamily();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
index d684d18..b67e2a8 100644
--- a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
+++ b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
@@ -137,7 +137,7 @@ public class TestHBaseCommitTable {
public void testBasicBehaviour() throws Throwable {
HBaseCommitTableConfig config = new HBaseCommitTableConfig();
config.setTableName(TEST_TABLE);
- HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
+ HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
Writer writer = commitTable.getWriter();
Client client = commitTable.getClient();
@@ -207,7 +207,7 @@ public class TestHBaseCommitTable {
HBaseCommitTableConfig config = new HBaseCommitTableConfig();
config.setTableName(TEST_TABLE);
- HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
+ HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
// Components under test
Writer writer = commitTable.getWriter();
@@ -266,7 +266,7 @@ public class TestHBaseCommitTable {
public void testClosingClientEmptyQueuesProperly() throws Throwable {
HBaseCommitTableConfig config = new HBaseCommitTableConfig();
config.setTableName(TEST_TABLE);
- HBaseCommitTable commitTable = new HBaseCommitTable(hbaseConf, config);
+ HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
Writer writer = commitTable.getWriter();
HBaseCommitTable.HBaseClient client = (HBaseClient) commitTable.getClient();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionConnectionFactory.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionConnectionFactory.java
new file mode 100644
index 0000000..7a7485c
--- /dev/null
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionConnectionFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.InterRegionServerRpcControllerFactory;
+import org.apache.omid.HBaseShims;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RegionConnectionFactory {
+ public static final String COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER = "omid.commit.table.access.on.compaction.retries";
+ public static final String COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE = "omid.commit.table.access.on.compaction.retry.pause";
+ public static final String COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER = "omid.commit.table.access.on.read.retries";
+ public static final String COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE = "omid.commit.table.access.on.read.retry.pause";
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionConnectionFactory.class);
+ private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER = 20;
+ private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE = 100;
+ // This setting controls how many retries occur on the region server if an
+ // IOException occurs while trying to access the commit table. Because a
+ // handler thread will be in use while these retries occur and the client
+ // will be blocked waiting, it must not tie up the call for longer than
+ // the client RPC timeout. Otherwise, the client will initiate retries on it's
+ // end, tying up yet another handler thread. It's best if the retries can be
+ // zero, as in that case the handler is released and the retries occur on the
+ // client side. In testing, we've seen NoServerForRegionException occur which
+ // is a DoNotRetryIOException which are not retried on the client. It's not
+ // clear if this is a real issue or a test-only issue.
+ private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER = 11;
+ private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE = 100;
+
+ private RegionConnectionFactory() {
+ }
+
+ public static enum ConnectionType {
+ COMPACTION_CONNECTION,
+ READ_CONNECTION,
+ DEFAULT_SERVER_CONNECTION;
+ }
+
+ private static Map<ConnectionType, Connection> connections =
+ new HashMap<ConnectionType, Connection>();
+
+ /**
+ * Utility to work around the limitation of the copy constructor
+ * {@link Configuration#Configuration(Configuration)} provided by the {@link Configuration}
+ * class. See https://issues.apache.org/jira/browse/HBASE-18378.
+ * The copy constructor doesn't copy all the config settings, so we need to resort to
+ * iterating through all the settings and setting it on the cloned config.
+ * @param toCopy configuration to copy
+ * @return
+ */
+ private static Configuration cloneConfig(Configuration toCopy) {
+ Configuration clone = new Configuration();
+ Iterator<Entry<String, String>> iterator = toCopy.iterator();
+ while (iterator.hasNext()) {
+ Entry<String, String> entry = iterator.next();
+ clone.set(entry.getKey(), entry.getValue());
+ }
+ return clone;
+ }
+
+ public static void shutdown() {
+ synchronized (RegionConnectionFactory.class) {
+ for (Connection connection : connections.values()) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.warn("Unable to close coprocessor connection", e);
+ }
+ }
+ connections.clear();
+ }
+ }
+
+
+ public static Connection getConnection(final ConnectionType connectionType, final RegionCoprocessorEnvironment env) throws IOException {
+ Connection connection = null;
+ if((connection = connections.get(connectionType)) == null) {
+ synchronized (RegionConnectionFactory.class) {
+ if((connection = connections.get(connectionType)) == null) {
+ connection = HBaseShims.newServerConnection(getTypeSpecificConfiguration(connectionType, env.getConfiguration()), env);
+ connections.put(connectionType, connection);
+ return connection;
+ }
+ }
+ }
+ return connection;
+ }
+
+ private static Configuration getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) {
+ switch (connectionType) {
+ case COMPACTION_CONNECTION:
+ return getCompactionConfig(conf);
+ case DEFAULT_SERVER_CONNECTION:
+ return conf;
+ case READ_CONNECTION:
+ return getReadConfig(conf);
+ default:
+ return conf;
+ }
+ }
+
+ private static Configuration getCompactionConfig(Configuration conf) {
+ Configuration compactionConfig = cloneConfig(conf);
+ // lower the number of rpc retries, so we don't hang the compaction
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ conf.getInt(COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER,
+ DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER));
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+ conf.getInt(COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE,
+ DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE));
+ return compactionConfig;
+ }
+
+ private static Configuration getReadConfig(Configuration conf) {
+ Configuration compactionConfig = cloneConfig(conf);
+ // lower the number of rpc retries, so we don't hang the compaction
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ conf.getInt(COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER,
+ DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER));
+ compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+ conf.getInt(COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE,
+ DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE));
+ return compactionConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index 901f819..0f39737 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.omid.HBaseShims;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.CompactorScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -63,7 +63,7 @@ public class OmidCompactor extends BaseRegionObserver {
private boolean enableCompactorForAllFamilies = false;
private HBaseCommitTableConfig commitTableConf = null;
- private Configuration conf = null;
+ private RegionCoprocessorEnvironment env = null;
@VisibleForTesting
Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
@@ -86,14 +86,14 @@ public class OmidCompactor extends BaseRegionObserver {
@Override
public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("Starting compactor coprocessor");
- conf = env.getConfiguration();
+ this.env = (RegionCoprocessorEnvironment) env;
commitTableConf = new HBaseCommitTableConfig();
- String commitTableName = conf.get(COMMIT_TABLE_NAME_KEY);
+ String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
retainNonTransactionallyDeletedCells =
- conf.getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
+ env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
LOG.info("Compactor coprocessor started");
}
@@ -111,6 +111,7 @@ public class OmidCompactor extends BaseRegionObserver {
+ @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
Store store,
InternalScanner scanner,
@@ -151,7 +152,7 @@ public class OmidCompactor extends BaseRegionObserver {
private CommitTable.Client initAndGetCommitTableClient() throws IOException {
LOG.info("Trying to get the commit table client");
- CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf);
+ CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, env), commitTableConf);
CommitTable.Client commitTableClient = commitTable.getClient();
LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
return commitTableClient;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index c2ed187..7d49d06 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -34,13 +34,13 @@ import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
import org.apache.omid.HBaseShims;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper;
+import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,9 +63,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(OmidSnapshotFilter.class);
private HBaseCommitTableConfig commitTableConf = null;
- private Configuration conf = null;
+ private RegionCoprocessorEnvironment env = null;
private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
- private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap();
+ private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
private CommitTable.Client inMemoryCommitTable = null;
public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
@@ -80,9 +80,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
@Override
public void start(CoprocessorEnvironment env) {
LOG.info("Starting snapshot filter coprocessor");
- conf = env.getConfiguration();
+ this.env = (RegionCoprocessorEnvironment)env;
commitTableConf = new HBaseCommitTableConfig();
- String commitTableName = conf.get(COMMIT_TABLE_NAME_KEY);
+ String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
@@ -101,7 +101,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
- @Override
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) {
SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(get);
if (snapshotFilter != null) {
@@ -110,7 +110,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
- @Override
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
throws IOException {
@@ -138,6 +138,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
Scan scan,
RegionScanner s) throws IOException {
@@ -146,6 +147,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
}
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
Scan scan) throws IOException {
byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
@@ -165,8 +167,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
return;
}
-
-
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
Scan scan,
RegionScanner s) {
@@ -183,8 +184,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
return s;
}
-
- @Override
+ // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s) {
SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
if (snapshotFilter != null) {
@@ -210,7 +210,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
if (inMemoryCommitTable != null) {
return inMemoryCommitTable;
}
- CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf);
+ CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, env), commitTableConf);
CommitTable.Client commitTableClient = commitTable.getClient();
LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
return commitTableClient;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java b/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
new file mode 100644
index 0000000..369f7e2
--- /dev/null
+++ b/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+
+import com.google.protobuf.RpcController;
+
+/**
+ * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index
+ * tables.
+ */
+class InterRegionServerRpcController extends DelegatingPayloadCarryingRpcController {
+ private final int priority;
+
+ public InterRegionServerRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
+ super(delegate);
+ // Set priority higher that normal, but lower than high
+ this.priority = (HConstants.HIGH_QOS + HConstants.NORMAL_QOS) / 2;
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ if (tn.isSystemTable()) {
+ super.setPriority(tn);
+ } else {
+ setPriority(this.priority);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java b/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
new file mode 100644
index 0000000..ed02abf
--- /dev/null
+++ b/hbase-shims/hbase-1/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that should only be used when making remote RPCs to the region
+ * servers. This prevents deadlocks by having RS->RS traffic handled by higher priorities
+ * queues. This controller factory shouldn't be globally configured anywhere and is meant to be
+ * used only internally by Omid.
+ */
+public class InterRegionServerRpcControllerFactory extends RpcControllerFactory {
+
+ public InterRegionServerRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController() {
+ PayloadCarryingRpcController delegate = super.newController();
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(CellScanner cellScanner) {
+ PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
+ PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ return getController(delegate);
+ }
+
+ private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ // construct a chain of controllers
+ return new InterRegionServerRpcController(delegate, conf);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java b/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
index c0aab90..cd24f49 100644
--- a/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
+++ b/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -24,6 +25,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -84,4 +86,21 @@ public class HBaseShims {
admin.modifyColumn(table, cfDesc);
}
}
-}
+
+ /**
+ * For HBase 1.x, an HConstants.HBASE_CLIENT_RETRIES_NUMBER value of 0
+ * means no retries, while for 2.x a value of 1 means no retries.
+ * @return
+ */
+ public static int getNoRetriesNumber() {
+ return 0;
+ }
+
+ /**
+ * Create an HBase Connection from the region server
+ */
+ public static Connection newServerConnection(Configuration config, RegionCoprocessorEnvironment env) throws IOException {
+ return new CoprocessorHConnection(config, (HRegionServer)env.getRegionServerServices());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
new file mode 100644
index 0000000..8b2da9f
--- /dev/null
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcController.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+
+import com.google.protobuf.RpcController;
+
+/**
+ * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index
+ * tables.
+ */
+class InterRegionServerRpcController extends DelegatingHBaseRpcController {
+ private final int priority;
+
+ public InterRegionServerRpcController(HBaseRpcController delegate, Configuration conf) {
+ super(delegate);
+ // Set priority higher that normal, but lower than high
+ this.priority = (HConstants.HIGH_QOS + HConstants.NORMAL_QOS) / 2;
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ if (tn.isSystemTable()) {
+ super.setPriority(tn);
+ } else {
+ setPriority(this.priority);
+ }
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
new file mode 100644
index 0000000..90c5990
--- /dev/null
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerRpcControllerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that should only be used when making remote RPCs to the region
+ * servers. This prevents deadlocks by having RS->RS traffic handled by higher priorities
+ * queues. This controller factory shouldn't be globally configured anywhere and is meant to be
+ * used only internally by Omid.
+ */
+public class InterRegionServerRpcControllerFactory extends RpcControllerFactory {
+
+ public InterRegionServerRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public HBaseRpcController newController() {
+ HBaseRpcController delegate = super.newController();
+ return getController(delegate);
+ }
+
+ @Override
+ public HBaseRpcController newController(CellScanner cellScanner) {
+ HBaseRpcController delegate = super.newController(cellScanner);
+ return getController(delegate);
+ }
+
+ @Override
+ public HBaseRpcController newController(List<CellScannable> cellIterables) {
+ HBaseRpcController delegate = super.newController(cellIterables);
+ return getController(delegate);
+ }
+
+ private HBaseRpcController getController(HBaseRpcController delegate) {
+ // construct a chain of controllers
+ return new InterRegionServerRpcController(delegate, conf);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java b/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
index 3391649..e8dc7df 100644
--- a/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
@@ -17,7 +17,9 @@
*/
package org.apache.omid;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -68,7 +70,7 @@ public class HBaseShims {
}
public static CellComparator cellComparatorInstance() {
- return CellComparator.getInstance();
+ return CellComparatorImpl.COMPARATOR;
}
public static boolean OmidCompactionEnabled(ObserverContext<RegionCoprocessorEnvironment> env,
@@ -89,4 +91,20 @@ public class HBaseShims {
admin.modifyColumnFamily(table, cfBuilder.build());
}
}
+
+ /**
+ * For HBase 1.x, an HConstants.HBASE_CLIENT_RETRIES_NUMBER value of 0
+ * means no retries, while for 2.x a value of 1 means no retries.
+ * @return
+ */
+ public static int getNoRetriesNumber() {
+ return 1;
+ }
+
+ /**
+ * Create an HBase Connection from the region server
+ */
+ public static Connection newServerConnection(Configuration config, RegionCoprocessorEnvironment env) throws IOException {
+ return env.createConnection(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd128bf..8373303 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,8 +123,6 @@
<!-- Basic properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
-
<!-- 3rd-Party Library Versioning -->
<hbase0.version>0.98.10.1-hadoop1</hbase0.version>
<hbase1.version>1.2.5</hbase1.version>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 25e124e..b4c794c 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.proto.TSOProto;
-import org.apache.omid.transaction.TransactionException;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.zk.ZKUtils;
import org.apache.statemachine.StateMachine;
@@ -59,7 +58,6 @@ import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
index 343610f..02eab90 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
@@ -17,10 +17,8 @@
*/
package org.apache.omid.tso.client;
-import java.util.List;
import java.util.Set;
-import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
/**
* Defines the protocol used on the client side to abstract communication to the TSO server
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
index 426df27..645806a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 04458f1..e5fbee8 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -39,7 +39,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 54302d0..ab17ecc 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -25,7 +25,6 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.metrics.NullMetricsProvider;
import org.jboss.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
index a5f236c..4f5d351 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static org.mockito.Matchers.any;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a40b6f8e/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
index df59530..182c611 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import static org.mockito.Matchers.any;
@@ -38,7 +37,6 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class TestWorldTimeOracle {