You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/24 01:30:58 UTC
incubator-twill git commit: (TWILL-139) Make ZKClient state change
based on authentication result
Repository: incubator-twill
Updated Branches:
refs/heads/feature/TWILL-139 [created] 5971eed11
(TWILL-139) Make ZKClient state change based on authentication result
- If authentication is enabled, only move the ZKClient to RUNNING state
when authentication passed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/5971eed1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/5971eed1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/5971eed1
Branch: refs/heads/feature/TWILL-139
Commit: 5971eed110b58ae62da5da43042bae3c5b631b18
Parents: d864ed1
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jun 23 16:24:14 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jun 23 16:30:54 2015 -0700
----------------------------------------------------------------------
pom.xml | 5 +
.../twill/internal/ZKAuthClientKafkaTest.java | 120 +++++++++++++++++++
twill-zookeeper/pom.xml | 5 +
.../zookeeper/DefaultZKClientService.java | 62 +++++++---
4 files changed, 178 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1556534..56245f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -799,6 +799,11 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java b/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
new file mode 100644
index 0000000..982b643
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/ZKAuthClientKafkaTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Properties;
+
+/**
+ * Test EmbeddedKafkaServer hanging issue with zookeeper authentication (TWILL-139).
+ */
+public class ZKAuthClientKafkaTest {
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ private static InMemoryZKServer zkServer;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ // Create the JAAS config file
+ File jaasConf = TMP_FOLDER.newFile();
+ try (PrintWriter writer = new PrintWriter(Files.newWriter(jaasConf, Charsets.UTF_8))) {
+ writer.println("Server {");
+ writer.println(" org.apache.zookeeper.server.auth.DigestLoginModule required");
+ writer.println(" user_admin=\"adminsecret\";");
+ writer.println("};");
+
+ writer.println("Client {");
+ writer.println(" org.apache.zookeeper.server.auth.DigestLoginModule required");
+ writer.println(" username=\"admin\"");
+ writer.println(" password=\"adminsecret\";");
+ writer.println("};");
+ }
+
+ System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
+ zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).setAutoCleanDataDir(false).build();
+ zkServer.startAndWait();
+ }
+
+ @AfterClass
+ public static void finish() {
+ zkServer.stopAndWait();
+ System.getProperties().remove("java.security.auth.login.config");
+ }
+
+ @Test (timeout = 30000)
+ public void testZKKafkaHang() throws Exception {
+ // This is a test for TWILL-139. Without the fix, there is a chance that the test timeout.
+ ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/kafka").build();
+ EmbeddedKafkaServer kafka = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString()));
+
+ zkClient.startAndWait();
+ ZKOperations.ignoreError(zkClient.create("/", null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null).get();
+ try {
+ try {
+ kafka.startAndWait();
+ } finally {
+ kafka.stopAndWait();
+ }
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+
+ private Properties generateKafkaConfig(String kafkaZKConnect) throws IOException {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("broker.id", "1");
+ prop.setProperty("socket.send.buffer.bytes", "1048576");
+ prop.setProperty("socket.receive.buffer.bytes", "1048576");
+ prop.setProperty("socket.request.max.bytes", "104857600");
+ prop.setProperty("num.partitions", "1");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("log.flush.interval.messages", "10000");
+ prop.setProperty("log.flush.interval.ms", "1000");
+ prop.setProperty("log.segment.bytes", "536870912");
+ prop.setProperty("zookeeper.connect", kafkaZKConnect);
+ prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ prop.setProperty("default.replication.factor", "1");
+ return prop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 0d2edda..d116207 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -68,5 +68,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5971eed1/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index dc2bfa9..571203e 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -43,6 +43,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -179,17 +180,17 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
// Create the requested path again
Futures.addCallback(
doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
- @Override
- public void onSuccess(String pathResult) {
- result.set(pathResult);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // handle the failure
- updateFailureResult(t, result, path, ignoreNodeExists);
- }
- });
+ @Override
+ public void onSuccess(String pathResult) {
+ result.set(pathResult);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // handle the failure
+ updateFailureResult(t, result, path, ignoreNodeExists);
+ }
+ });
}
@Override
@@ -466,9 +467,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
}
try {
- if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
- LOG.debug("Connected to ZooKeeper: {}", zkStr);
- notifyStarted();
+ if (state == State.STARTING && doStartStateChange(event.getState())) {
return;
}
if (event.getState() == Event.KeeperState.Expired) {
@@ -503,6 +502,41 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
}
}
+ /**
+ * Changes the service state from STARTING to either RUNNING or FAILED
+ *
+ * @return true if state changed
+ */
+ private boolean doStartStateChange(Event.KeeperState keeperState) {
+ // If failed to authenticate, make the starting of this service fail.
+ if (keeperState == Event.KeeperState.AuthFailed) {
+ notifyFailed(new KeeperException.AuthFailedException());
+ return true;
+ }
+ // If authentication passed, notify service started.
+ if (keeperState == Event.KeeperState.SaslAuthenticated) {
+ LOG.debug("Authenticated and connected to ZooKeeper: {}", zkStr);
+ notifyStarted();
+ return true;
+ }
+ // If SyncConnected without authentication, notify service started. If authentication is in process, don't
+ // notify, as we expect there will be SaslAuthenticated or AuthFailed event.
+ if (keeperState == Event.KeeperState.SyncConnected) {
+ ZooKeeperSaslClient saslClient = zooKeeper.get().getSaslClient();
+ if (saslClient == null) {
+ // Shouldn't be null. If null, it means authentication already failed earlier, meaning the AuthFailed should
+ // have been triggered
+ notifyFailed(new KeeperException.AuthFailedException());
+ return true;
+ }
+ if (!saslClient.clientTunneledAuthenticationInProgress()) {
+ LOG.debug("Connected to ZooKeeper: {}", zkStr);
+ notifyStarted();
+ return true;
+ }
+ }
+ return false;
+ }
/**
* Creates a {@link Runnable} task that will get executed in the event executor for transiting this