You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/07 23:37:28 UTC

incubator-tephra git commit: TEPHRA-179 Create a new instance of TransactionManager and related classes when TransactionService becomes leader.

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 8e5ef26ad -> ae574caf7


TEPHRA-179 Create a new instance of TransactionManager and related classes when TransactionService becomes leader.

This closes #2 on GitHub.

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/ae574caf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/ae574caf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/ae574caf

Branch: refs/heads/master
Commit: ae574caf7e1adf62e495ba1a7a28ad8834eb98ea
Parents: 8e5ef26
Author: Ali Anwar <an...@berkeley.edu>
Authored: Fri Sep 2 17:08:18 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Wed Sep 7 16:37:03 2016 -0700

----------------------------------------------------------------------
 .../inmemory/InMemoryTransactionService.java    |   6 +-
 .../runtime/TransactionDistributedModule.java   |  15 +--
 .../runtime/TransactionInMemoryModule.java      |  11 +-
 .../tephra/runtime/TransactionLocalModule.java  |   8 +-
 .../ThriftTransactionServerTest.java            | 100 ++++++++++++++-----
 .../tephra/examples/BalanceBooksTest.java       |   1 +
 6 files changed, 101 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
index 823f934..fb45362 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTransactionService.java
@@ -43,6 +43,7 @@ public class InMemoryTransactionService extends AbstractService {
 
   private final DiscoveryService discoveryService;
   private final String serviceName;
+  // this is Provider, so that we can have multiple instances of it (use a new instance after leader election)
   protected final Provider<TransactionManager> txManagerProvider;
   private Cancellable cancelDiscovery;
   protected TransactionManager txManager;
@@ -55,9 +56,8 @@ public class InMemoryTransactionService extends AbstractService {
   protected final int maxReadBufferBytes;
 
   @Inject
-  public InMemoryTransactionService(Configuration conf,
-                            DiscoveryService discoveryService,
-                            Provider<TransactionManager> txManagerProvider) {
+  public InMemoryTransactionService(Configuration conf, DiscoveryService discoveryService,
+                                    Provider<TransactionManager> txManagerProvider) {
 
     this.discoveryService = discoveryService;
     this.txManagerProvider = txManagerProvider;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
index aaf3534..ff796c1 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
@@ -19,6 +19,7 @@
 package org.apache.tephra.runtime;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
@@ -41,14 +42,16 @@ final class TransactionDistributedModule extends AbstractModule {
 
   @Override
   protected void configure() {
+    // some of these classes need to be non-singleton in order to create a new instance during leader() in
+    // TransactionService
     bind(SnapshotCodecProvider.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
-      .to(HDFSTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class);
+    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class);
 
-    bind(TransactionManager.class).in(Singleton.class);
-    bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
-    bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
+    // to catch issues during configure time
+    bind(TransactionManager.class);
+    bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON);
+    bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
 
     install(new FactoryModuleBuilder()
         .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
index de7678a..1b9032c 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
@@ -19,6 +19,7 @@
 package org.apache.tephra.runtime;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import org.apache.tephra.DefaultTransactionExecutor;
@@ -43,10 +44,12 @@ public class TransactionInMemoryModule extends AbstractModule {
 
   @Override
   protected void configure() {
-    bind(SnapshotCodecProvider.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionManager.class).in(Singleton.class);
-    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+    // some of these classes need to be non-singleton in order to create a new instance during leader() in
+    // TransactionService
+    bind(SnapshotCodecProvider.class).in(Scopes.SINGLETON);
+    bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class);
+    bind(TransactionManager.class);
+    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Scopes.SINGLETON);
     // no metrics output for in-memory
     bind(MetricsCollector.class).to(TxMetricsCollector.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
index 7d0b663..4a79e8d 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
@@ -41,12 +41,14 @@ final class TransactionLocalModule extends AbstractModule {
 
   @Override
   protected void configure() {
+    // some of these classes need to be non-singleton in order to create a new instance during leader() in
+    // TransactionService
     bind(SnapshotCodecProvider.class).in(Singleton.class);
     bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
-      .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+      .to(LocalFileTransactionStateStorage.class);
+    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class);
 
-    bind(TransactionManager.class).in(Singleton.class);
+    bind(TransactionManager.class);
     bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
     bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
index a930720..bbe03ed 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/ThriftTransactionServerTest.java
@@ -27,6 +27,7 @@ import com.google.inject.Scopes;
 import com.google.inject.util.Modules;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.ThriftTransactionSystemTest;
+import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.persist.InMemoryTransactionStateStorage;
@@ -43,10 +44,9 @@ import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -62,6 +63,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132
+ * as well as proper handling of zk election https://issues.cask.co/browse/TEPHRA-179.
  */
 public class ThriftTransactionServerTest {
   private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
@@ -70,17 +72,18 @@ public class ThriftTransactionServerTest {
   private static ZKClientService zkClientService;
   private static TransactionService txService;
   private static TransactionStateStorage storage;
-  static Injector injector;
+  private static Injector injector;
 
   private static final int NUM_CLIENTS = 17;
-  private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
-  private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);
+  // storageWaitLatch is used to simulate slow HDFS writes for TEPHRA-132
+  private static CountDownLatch storageWaitLatch;
+  private static CountDownLatch clientsDoneLatch;
 
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  @BeforeClass
-  public static void start() throws Exception {
+  @Before
+  public void start() throws Exception {
     zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
     zkServer.startAndWait();
 
@@ -103,6 +106,8 @@ public class ThriftTransactionServerTest {
           @Override
           protected void configure() {
             bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
+            // overriding this to make it non-singleton
+            bind(TransactionSystemClient.class).to(TransactionServiceClient.class);
           }
         }),
       new TransactionClientModule()
@@ -120,15 +125,15 @@ public class ThriftTransactionServerTest {
     } catch (Exception e) {
       LOG.error("Failed to start service: ", e);
     }
-  }
 
-  @Before
-  public void reset() throws Exception {
     getClient().resetState();
+
+    storageWaitLatch = new CountDownLatch(1);
+    clientsDoneLatch = new CountDownLatch(NUM_CLIENTS);
   }
 
-  @AfterClass
-  public static void stop() throws Exception {
+  @After
+  public void stop() throws Exception {
     txService.stopAndWait();
     storage.stopAndWait();
     zkClientService.stopAndWait();
@@ -141,6 +146,8 @@ public class ThriftTransactionServerTest {
 
   @Test
   public void testThriftServerStop() throws Exception {
+    Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState());
+
     int nThreads = NUM_CLIENTS;
     ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
     for (int i = 0; i < nThreads; ++i) {
@@ -149,7 +156,8 @@ public class ThriftTransactionServerTest {
         public void run() {
           try {
             TransactionSystemClient txClient = getClient();
-            CLIENTS_DONE_LATCH.countDown();
+            clientsDoneLatch.countDown();
+            // this will hang, due to the slow edit log (until the latch in it is stopped)
             txClient.startShort();
           } catch (Exception e) {
             // Exception expected
@@ -158,23 +166,53 @@ public class ThriftTransactionServerTest {
       });
     }
 
-    // Wait till all clients finish sending reqeust to transaction manager
-    CLIENTS_DONE_LATCH.await();
+    // Wait till all clients finish sending request to transaction manager
+    clientsDoneLatch.await();
     TimeUnit.SECONDS.sleep(1);
 
     // Expire zookeeper session, which causes Thrift server to stop.
     expireZkSession(zkClientService);
-    waitForThriftTermination();
+    waitForThriftStop();
 
-    // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again.
+    // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift server again.
     zkClientService.stopAndWait();
-    STORAGE_WAIT_LATCH.countDown();
+    storageWaitLatch.countDown();
     TimeUnit.SECONDS.sleep(1);
 
     // Make sure Thrift server stopped.
     Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
   }
 
+  @Test
+  public void testThriftServerRestart() throws Exception {
+    // we don't need a slow Transaction Log for this test case
+    storageWaitLatch.countDown();
+    Assert.assertEquals(Service.State.RUNNING, txService.thriftRPCServerState());
+
+    // simply start + commit transaction
+    TransactionSystemClient txClient = getClient();
+    Transaction tx = txClient.startShort();
+    txClient.commit(tx);
+
+    // Expire zookeeper session, which causes Thrift server to stop running.
+    expireZkSession(zkClientService);
+    waitForThriftStop();
+
+    // wait for the thrift rpc server to be in running state again
+    waitFor("Failed to wait for txService to be running.", new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        return Service.State.RUNNING == txService.thriftRPCServerState();
+      }
+    });
+
+    // we need to get a new txClient, because the old one will no longer work after the thrift server restart
+    txClient = getClient();
+    // verify that we can start and commit a transaction after becoming leader again
+    tx = txClient.startShort();
+    txClient.commit(tx);
+  }
+
   private void expireZkSession(ZKClientService zkClientService) throws Exception {
     ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
     final SettableFuture<?> connectFuture = SettableFuture.create();
@@ -188,7 +226,7 @@ public class ThriftTransactionServerTest {
     };
 
     // Create another Zookeeper session with the same sessionId so that the original one expires.
-    final ZooKeeper dupZookeeper =
+    ZooKeeper dupZookeeper =
       new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
                     zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
     connectFuture.get(30, TimeUnit.SECONDS);
@@ -196,13 +234,27 @@ public class ThriftTransactionServerTest {
     dupZookeeper.close();
   }
 
-  private void waitForThriftTermination() throws InterruptedException {
-    int count = 0;
-    while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) {
+  private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception {
+    for (int i = 0; i < 200; i++) {
+      boolean value = callable.call();
+      if (value) {
+        return;
+      }
       TimeUnit.MILLISECONDS.sleep(50);
     }
+    Assert.fail(errorMessage);
+  }
+
+  private void waitForThriftStop() throws Exception {
+    waitFor("Failed to wait for txService to stop", new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        return Service.State.RUNNING != txService.thriftRPCServerState();
+      }
+    });
   }
 
+  // the edit log will block until a countdown latch is decremented to simulate heavy load for TEPHRA-132
   private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
     @Override
     public TransactionLog createLog(long timestamp) throws IOException {
@@ -218,7 +270,7 @@ public class ThriftTransactionServerTest {
     @Override
     public void append(TransactionEdit edit) throws IOException {
       try {
-        STORAGE_WAIT_LATCH.await();
+        storageWaitLatch.await();
       } catch (InterruptedException e) {
         LOG.error("Got exception: ", e);
       }
@@ -228,7 +280,7 @@ public class ThriftTransactionServerTest {
     @Override
     public void append(List<TransactionEdit> edits) throws IOException {
       try {
-        STORAGE_WAIT_LATCH.await();
+        storageWaitLatch.await();
       } catch (InterruptedException e) {
         LOG.error("Got exception: ", e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/ae574caf/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
index 3be1c43..1abeece 100644
--- a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
+++ b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -107,6 +107,7 @@ public class BalanceBooksTest {
       txService.startAndWait();
     } catch (Exception e) {
       LOG.error("Failed to start service: ", e);
+      throw e;
     }
 
   }