You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/07 23:37:28 UTC
incubator-tephra git commit: TEPHRA-179 Create a new instance of
TransactionManager and related classes when TransactionService becomes
leader.
Repository: incubator-tephra
Updated Branches:
refs/heads/master 8e5ef26ad -> ae574caf7
TEPHRA-179 Create a new instance of TransactionManager and related classes when TransactionService becomes leader.
This closes #2 on GitHub.
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/ae574caf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/ae574caf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/ae574caf
Branch: refs/heads/master
Commit: ae574caf7e1adf62e495ba1a7a28ad8834eb98ea
Parents: 8e5ef26
Author: Ali Anwar <an...@berkeley.edu>
Authored: Fri Sep 2 17:08:18 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Wed Sep 7 16:37:03 2016 -0700
----------------------------------------------------------------------
.../inmemory/InMemoryTransactionService.java | 6 +-
.../runtime/TransactionDistributedModule.java | 15 +--
.../runtime/TransactionInMemoryModule.java | 11 +-
.../tephra/runtime/TransactionLocalModule.java | 8 +-
.../ThriftTransactionServerTest.java | 100 ++++++++++++++-----
.../tephra/examples/BalanceBooksTest.java | 1 +
6 files changed, 101 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
index 823f934..fb45362 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
@@ -43,6 +43,7 @@ public class InMemoryTransactionService extends AbstractService {
private final DiscoveryService discoveryService;
private final String serviceName;
+ // this is Provider, so that we can have multiple instances of it (use a new instance after leader election)
protected final Provider<TransactionManager> txManagerProvider;
private Cancellable cancelDiscovery;
protected TransactionManager txManager;
@@ -55,9 +56,8 @@ public class InMemoryTransactionService extends AbstractService {
protected final int maxReadBufferBytes;
@Inject
- public InMemoryTransactionService(Configuration conf,
- DiscoveryService discoveryService,
- Provider<TransactionManager> txManagerProvider) {
+ public InMemoryTransactionService(Configuration conf, DiscoveryService discoveryService,
+ Provider<TransactionManager> txManagerProvider) {
this.discoveryService = discoveryService;
this.txManagerProvider = txManagerProvider;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
index aaf3534..ff796c1 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
@@ -19,6 +19,7 @@
package org.apache.tephra.runtime;
import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
@@ -41,14 +42,16 @@ final class TransactionDistributedModule extends AbstractModule {
@Override
protected void configure() {
+ // some of these classes need to be non-singleton in order to create a new instance during leader() in
+ // TransactionService
bind(SnapshotCodecProvider.class).in(Singleton.class);
- bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
- .to(HDFSTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class);
+ bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class);
- bind(TransactionManager.class).in(Singleton.class);
- bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
- bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
+ // to catch issues during configure time
+ bind(TransactionManager.class);
+ bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON);
+ bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
install(new FactoryModuleBuilder()
.implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
index de7678a..1b9032c 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
@@ -19,6 +19,7 @@
package org.apache.tephra.runtime;
import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.apache.tephra.DefaultTransactionExecutor;
@@ -43,10 +44,12 @@ public class TransactionInMemoryModule extends AbstractModule {
@Override
protected void configure() {
- bind(SnapshotCodecProvider.class).in(Singleton.class);
- bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionManager.class).in(Singleton.class);
- bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+ // some of these classes need to be non-singleton in order to create a new instance during leader() in
+ // TransactionService
+ bind(SnapshotCodecProvider.class).in(Scopes.SINGLETON);
+ bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class);
+ bind(TransactionManager.class);
+ bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Scopes.SINGLETON);
// no metrics output for in-memory
bind(MetricsCollector.class).to(TxMetricsCollector.class);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
index 7d0b663..4a79e8d 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
@@ -41,12 +41,14 @@ final class TransactionLocalModule extends AbstractModule {
@Override
protected void configure() {
+ // some of these classes need to be non-singleton in order to create a new instance during leader() in
+ // TransactionService
bind(SnapshotCodecProvider.class).in(Singleton.class);
bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
- .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+ .to(LocalFileTransactionStateStorage.class);
+ bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class);
- bind(TransactionManager.class).in(Singleton.class);
+ bind(TransactionManager.class);
bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
index a930720..bbe03ed 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -27,6 +27,7 @@ import com.google.inject.Scopes;
import com.google.inject.util.Modules;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ThriftTransactionSystemTest;
+import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.persist.InMemoryTransactionStateStorage;
@@ -43,10 +44,9 @@ import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -62,6 +63,7 @@ import java.util.concurrent.TimeUnit;
/**
* This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132
+ * as well as proper handling of zk election https://issues.cask.co/browse/TEPHRA-179.
*/
public class ThriftTransactionServerTest {
private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
@@ -70,17 +72,18 @@ public class ThriftTransactionServerTest {
private static ZKClientService zkClientService;
private static TransactionService txService;
private static TransactionStateStorage storage;
- static Injector injector;
+ private static Injector injector;
private static final int NUM_CLIENTS = 17;
- private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
- private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
+ // storageWaitLatch is used to simulate slow HDFS writes for TEPHRA-132
+ private static CountDownLatch storageWaitLatch;
+ private static CountDownLatch clientsDoneLatch;
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
- @BeforeClass
- public static void start() throws Exception {
+ @Before
+ public void start() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
zkServer.startAndWait();
@@ -103,6 +106,8 @@ public class ThriftTransactionServerTest {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
+ // overriding this to make it non-singleton
+ bind(TransactionSystemClient.class).to(TransactionServiceClient.class);
}
}),
new TransactionClientModule()
@@ -120,15 +125,15 @@ public class ThriftTransactionServerTest {
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
}
- }
- @Before
- public void reset() throws Exception {
getClient().resetState();
+
+ storageWaitLatch = new CountDownLatch(1);
+ clientsDoneLatch = new CountDownLatch(NUM_CLIENTS);
}
- @AfterClass
- public static void stop() throws Exception {
+ @After
+ public void stop() throws Exception {
txService.stopAndWait();
storage.stopAndWait();
zkClientService.stopAndWait();
@@ -141,6 +146,8 @@ public class ThriftTransactionServerTest {
@Test
public void testThriftServerStop() throws Exception {
+ Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState());
+
int nThreads = NUM_CLIENTS;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; ++i) {
@@ -149,7 +156,8 @@ public class ThriftTransactionServerTest {
public void run() {
try {
TransactionSystemClient txClient = getClient();
- CLIENTS_DONE_LATCH.countDown();
+ clientsDoneLatch.countDown();
+ // this will hang, due to the slow edit log (until the latch in it is stopped)
txClient.startShort();
} catch (Exception e) {
// Exception expected
@@ -158,23 +166,53 @@ public class ThriftTransactionServerTest {
});
}
- // Wait till all clients finish sending reqeust to transaction manager
- CLIENTS_DONE_LATCH.await();
+ // Wait till all clients finish sending request to transaction manager
+ clientsDoneLatch.await();
TimeUnit.SECONDS.sleep(1);
// Expire zookeeper session, which causes Thrift server to stop.
expireZkSession(zkClientService);
- waitForThriftTermination();
+ waitForThriftStop();
- // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
+ // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift server again.
zkClientService.stopAndWait();
- STORAGE_WAIT_LATCH.countDown();
+ storageWaitLatch.countDown();
TimeUnit.SECONDS.sleep(1);
// Make sure Thrift server stopped.
Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
}
+ @Test
+ public void testThriftServerRestart() throws Exception {
+ // we don't need a slow Transaction Log for this test case
+ storageWaitLatch.countDown();
+ Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState());
+
+ // simply start + commit transaction
+ TransactionSystemClient txClient = getClient();
+ Transaction tx = txClient.startShort();
+ txClient.commit(tx);
+
+ // Expire zookeeper session, which causes Thrift server to stop running.
+ expireZkSession(zkClientService);
+ waitForThriftStop();
+
+ // wait for the thrift rpc server to be in running state again
+ waitFor("Failed to wait for txService to be running.", new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return Service.State.RUNNING == txService.thriftRPCServerState();
+ }
+ });
+
+ // we need to get a new txClient, because the old one will no longer work after the thrift server restart
+ txClient = getClient();
+ // verify that we can start and commit a transaction after becoming leader again
+ tx = txClient.startShort();
+ txClient.commit(tx);
+ }
+
private void expireZkSession(ZKClientService zkClientService) throws Exception {
ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
final SettableFuture<?> connectFuture = SettableFuture.create();
@@ -188,7 +226,7 @@ public class ThriftTransactionServerTest {
};
// Create another Zookeeper session with the same sessionId so that the original one expires.
- final ZooKeeper dupZookeeper =
+ ZooKeeper dupZookeeper =
new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
connectFuture.get(30, TimeUnit.SECONDS);
@@ -196,13 +234,27 @@ public class ThriftTransactionServerTest {
dupZookeeper.close();
}
- private void waitForThriftTermination() throws InterruptedException {
- int count = 0;
- while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
+ private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception {
+ for (int i = 0; i < 200; i++) {
+ boolean value = callable.call();
+ if (value) {
+ return;
+ }
TimeUnit.MILLISECONDS.sleep(50);
}
+ Assert.fail(errorMessage);
+ }
+
+ private void waitForThriftStop() throws Exception {
+ waitFor("Failed to wait for txService to stop", new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return Service.State.RUNNING != txService.thriftRPCServerState();
+ }
+ });
}
+ // the edit log will block until a countdown latch is decremented to simulate heavy load for TEPHRA-132
private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
@Override
public TransactionLog createLog(long timestamp) throws IOException {
@@ -218,7 +270,7 @@ public class ThriftTransactionServerTest {
@Override
public void append(TransactionEdit edit) throws IOException {
try {
- STORAGE_WAIT_LATCH.await();
+ storageWaitLatch.await();
} catch (InterruptedException e) {
LOG.error("Got exception: ", e);
}
@@ -228,7 +280,7 @@ public class ThriftTransactionServerTest {
@Override
public void append(List<TransactionEdit> edits) throws IOException {
try {
- STORAGE_WAIT_LATCH.await();
+ storageWaitLatch.await();
} catch (InterruptedException e) {
LOG.error("Got exception: ", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
index 3be1c43..1abeece 100644
--- a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
+++ b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -107,6 +107,7 @@ public class BalanceBooksTest {
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
+ throw e;
}
}