You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/24 01:30:58 UTC

incubator-twill git commit: (TWILL-139) Make ZKClient state change based on authentication result

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/TWILL-139 [created] 5971eed11


(TWILL-139) Make ZKClient state change based on authentication result

- If authentication is enabled, only move the ZKClient to RUNNING state
  when authentication passed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/5971eed1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/5971eed1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/5971eed1

Branch: refs/heads/feature/TWILL-139
Commit: 5971eed110b58ae62da5da43042bae3c5b631b18
Parents: d864ed1
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jun 23 16:24:14 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jun 23 16:30:54 2015 -0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 .../twill/internal/ZKAuthClientKafkaTest.java   | 120 +++++++++++++++++++
 twill-zookeeper/pom.xml                         |   5 +
 .../zookeeper/DefaultZKClientService.java       |  62 +++++++---
 4 files changed, 178 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1556534..56245f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -799,6 +799,11 @@
                 <version>${slf4j.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>log4j-over-slf4j</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-core</artifactId>
                 <version>${logback.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java b/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
new file mode 100644
index 0000000..982b643
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Properties;
+
+/**
+ * Test EmbeddedKafkaServer hanging issue with zookeeper authentication (TWILL-139).
+ */
+public class ZKAuthClientKafkaTest {
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+  private static InMemoryZKServer zkServer;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    // Create the JAAS config file
+    File jaasConf = TMP_FOLDER.newFile();
+    try (PrintWriter writer = new PrintWriter(Files.newWriter(jaasConf, Charsets.UTF_8))) {
+      writer.println("Server {");
+      writer.println("  org.apache.zookeeper.server.auth.DigestLoginModule required");
+      writer.println("  user_admin=\"adminsecret\";");
+      writer.println("};");
+
+      writer.println("Client {");
+      writer.println("  org.apache.zookeeper.server.auth.DigestLoginModule required");
+      writer.println("  username=\"admin\"");
+      writer.println("  password=\"adminsecret\";");
+      writer.println("};");
+    }
+
+    System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
+    zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).setAutoCleanDataDir(false).build();
+    zkServer.startAndWait();
+  }
+
+  @AfterClass
+  public static void finish() {
+    zkServer.stopAndWait();
+    System.getProperties().remove("java.security.auth.login.config");
+  }
+
+  @Test (timeout = 30000)
+  public void testZKKafkaHang() throws Exception {
+    // This is a test for TWILL-139. Without the fix, there is a chance that the test timeout.
+    ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/kafka").build();
+    EmbeddedKafkaServer kafka = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString()));
+
+    zkClient.startAndWait();
+    ZKOperations.ignoreError(zkClient.create("/", null, CreateMode.PERSISTENT),
+                             KeeperException.NodeExistsException.class, null).get();
+    try {
+      try {
+        kafka.startAndWait();
+      } finally {
+        kafka.stopAndWait();
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+  }
+
+  private Properties generateKafkaConfig(String kafkaZKConnect) throws IOException {
+    int port = Networks.getRandomPort();
+    Preconditions.checkState(port > 0, "Failed to get random port.");
+
+    Properties prop = new Properties();
+    prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+    prop.setProperty("port", Integer.toString(port));
+    prop.setProperty("broker.id", "1");
+    prop.setProperty("socket.send.buffer.bytes", "1048576");
+    prop.setProperty("socket.receive.buffer.bytes", "1048576");
+    prop.setProperty("socket.request.max.bytes", "104857600");
+    prop.setProperty("num.partitions", "1");
+    prop.setProperty("log.retention.hours", "24");
+    prop.setProperty("log.flush.interval.messages", "10000");
+    prop.setProperty("log.flush.interval.ms", "1000");
+    prop.setProperty("log.segment.bytes", "536870912");
+    prop.setProperty("zookeeper.connect", kafkaZKConnect);
+    prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+    prop.setProperty("default.replication.factor", "1");
+    return prop;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 0d2edda..d116207 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -68,5 +68,10 @@
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index dc2bfa9..571203e 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -43,6 +43,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -179,17 +180,17 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
             // Create the requested path again
             Futures.addCallback(
               doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
-              @Override
-              public void onSuccess(String pathResult) {
-                result.set(pathResult);
-              }
-
-              @Override
-              public void onFailure(Throwable t) {
-                // handle the failure
-                updateFailureResult(t, result, path, ignoreNodeExists);
-              }
-            });
+                @Override
+                public void onSuccess(String pathResult) {
+                  result.set(pathResult);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  // handle the failure
+                  updateFailureResult(t, result, path, ignoreNodeExists);
+                }
+              });
           }
 
           @Override
@@ -466,9 +467,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
       }
 
       try {
-        if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
-          LOG.debug("Connected to ZooKeeper: {}", zkStr);
-          notifyStarted();
+        if (state == State.STARTING && doStartStateChange(event.getState())) {
           return;
         }
         if (event.getState() == Event.KeeperState.Expired) {
@@ -503,6 +502,41 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
       }
     }
 
+    /**
+     * Changes the service state from STARTING to either RUNNING or FAILED
+     *
+     * @return true if state changed
+     */
+    private boolean doStartStateChange(Event.KeeperState keeperState) {
+      // If failed to authenticate, make the starting of this service fail.
+      if (keeperState == Event.KeeperState.AuthFailed) {
+        notifyFailed(new KeeperException.AuthFailedException());
+        return true;
+      }
+      // If authentication passed, notify service started.
+      if (keeperState == Event.KeeperState.SaslAuthenticated) {
+        LOG.debug("Authenticated and connected to ZooKeeper: {}", zkStr);
+        notifyStarted();
+        return true;
+      }
+      // If SyncConnected without authentication, notify service started. If authentication is in process, don't
+      // notify, as we expect there will be SaslAuthenticated or AuthFailed event.
+      if (keeperState == Event.KeeperState.SyncConnected) {
+        ZooKeeperSaslClient saslClient = zooKeeper.get().getSaslClient();
+        if (saslClient == null) {
+          // Shouldn't be null. If null, it means authentication already failed earlier, meaning the AuthFailed should
+          // have been triggered
+          notifyFailed(new KeeperException.AuthFailedException());
+          return true;
+        }
+        if (!saslClient.clientTunneledAuthenticationInProgress()) {
+          LOG.debug("Connected to ZooKeeper: {}", zkStr);
+          notifyStarted();
+          return true;
+        }
+      }
+      return false;
+    }
 
     /**
      * Creates a {@link Runnable} task that will get executed in the event executor for transiting this