You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/12 17:51:35 UTC
samza git commit: SAMZA-1327: fail if namespace specified in the
connection string does not exist
Repository: samza
Updated Branches:
refs/heads/master 371473dd1 -> 1eb8fd472
SAMZA-1327: fail if namespace specified in the connection string does not exist
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #218 from sborya/zkNameSpaceFail
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb8fd47
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb8fd47
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb8fd47
Branch: refs/heads/master
Commit: 1eb8fd472458ad1ba03ac4e95627203bc7b7a12f
Parents: 371473d
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon Jun 12 10:51:27 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Jun 12 10:51:27 2017 -0700
----------------------------------------------------------------------
.../samza/zk/ZkCoordinationServiceFactory.java | 39 ++---
.../org/apache/samza/zk/TestZkNamespace.java | 153 +++++++++++++++++++
.../java/org/apache/samza/zk/TestZkUtils.java | 20 ---
3 files changed, 174 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index b3a2a6f..20fcfa4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory;
public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
- // TODO - Why should this method be synchronized?
- synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+ public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
ZkConfig zkConfig = new ZkConfig(config);
- ZkClient zkClient = createZkClient(zkConfig.getZkConnect(),
- zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-
- // make sure the 'path' exists
- createZkPath(zkConfig.getZkConnect(), zkClient);
+ ZkClient zkClient =
+ createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
@@ -56,31 +52,38 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
* @return zkClient object
*/
public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) {
+ ZkClient zkClient;
try {
- return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
+ zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
} catch (Exception e) {
// ZkClient constructor may throw a variety of different exceptions, not all of them Zk based.
throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e);
}
+
+ // make sure the namespace in zk exists (if specified)
+ validateZkNameSpace(connectString, zkClient);
+
+ return zkClient;
}
/**
- * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time.
+ * if ZkConnectString contains namespace path at the end, but it does not exist we should fail
* @param zkConnect - connect string
* @param zkClient - zkClient object to talk to the ZK
*/
- public static void createZkPath(String zkConnect, ZkClient zkClient) {
+ public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) {
ConnectStringParser parser = new ConnectStringParser(zkConnect);
String path = parser.getChrootPath();
- LOG.info("path =" + path);
- if (!Strings.isNullOrEmpty(path)) {
- // create this path in zk
- LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses());
- if (!zkClient.exists(path)) {
- zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists
- }
+ if (Strings.isNullOrEmpty(path)) {
+ return; // no namespace path
}
- }
+ LOG.info("connectString = " + zkConnect + "; path =" + path);
+
+ // if namespace specified (path above) but "/" does not exists, we will fail
+ if (!zkClient.exists("/")) {
+ throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
new file mode 100644
index 0000000..3ce203e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.base.Strings;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.SamzaException;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+// zk namespace is similar to chroot in unix. It is defined in ZK, but user doesn't see it.
+// For user "/" is the root, but in ZK tree it is actually host:port/namespace. If namespace is not created, then accessing
+// "/" by user will fail.
+public class TestZkNamespace {
+ private static EmbeddedZookeeper zkServer = null;
+ private ZkClient zkClient = null;
+ private ZkClient zkClient1 = null;
+ private static final int SESSION_TIMEOUT_MS = 20000;
+ private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+ @BeforeClass
+ public static void setup()
+ throws InterruptedException {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ zkServer.teardown();
+ }
+
+ // for these tests we need to connect to zk multiple times
+ private void initZk(String zkConnect) {
+ try {
+ zkClient = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS);
+ } catch (Exception e) {
+ Assert.fail("Client connection setup failed for connect + " + zkConnect + ": " + e);
+ }
+ }
+
+ private void tearDownZk() {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+
+ if (zkClient1 != null) {
+ zkClient1.close();
+ }
+ }
+
+ // create namespace for zk before accessing it, thus using a separate client
+ private void createNamespace(String pathToCreate) {
+ if (Strings.isNullOrEmpty(pathToCreate)) {
+ return;
+ }
+
+ String zkConnect = "127.0.0.1:" + zkServer.getPort();
+ try {
+ zkClient1 = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS);
+ } catch (Exception e) {
+ Assert.fail("Client connection setup failed. Aborting tests..");
+ }
+ zkClient1.createPersistent(pathToCreate, true);
+ }
+
+ // create namespace, create connection, validate the connection
+ private void testDoNotFailIfNameSpacePresent(String zkNameSpace) {
+ String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace;
+ createNamespace(zkNameSpace);
+ initZk(zkConnect);
+ ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+
+ zkClient.createPersistent("/test");
+ zkClient.createPersistent("/test/test1");
+
+ // test if the new root exists
+ Assert.assertTrue(zkClient.exists("/"));
+ Assert.assertTrue(zkClient.exists("/test"));
+ Assert.assertTrue(zkClient.exists("/test/test1"));
+ }
+
+ @Test
+ public void testValidateFailZkNameSpace1LevelPath() {
+ try {
+ String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace";
+ initZk(zkConnect);
+ ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+ Assert.fail("1.Should fail with exception, because namespace doesn't exist");
+ } catch (SamzaException e) {
+ // expected
+ } finally {
+ tearDownZk();
+ }
+ }
+
+ @Test
+ public void testValidateFailZkNameSpace2LevelPath() {
+ try {
+ String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz";
+ initZk(zkConnect);
+ ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+ Assert.fail("2.Should fail with exception, because namespace doesn't exist");
+ } catch (SamzaException e) {
+ // expected
+ } finally {
+ tearDownZk();
+ }
+ }
+
+ @Test
+ public void testValidateFailZkNameSpaceEmptyPath() {
+ // should succeed, because no namespace provided
+ String zkConnect = "127.0.0.1:" + zkServer.getPort() + "";
+ initZk(zkConnect);
+ ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+ tearDownZk();
+ }
+
+ @Test
+ public void testValidateNotFailZkNameSpace() {
+ // now positive tests - with existing namespace
+ testDoNotFailIfNameSpacePresent("/zkNameSpace1");
+
+ testDoNotFailIfNameSpacePresent("/zkNameSpace1/xyz1");
+
+ testDoNotFailIfNameSpacePresent("");
+
+ tearDownZk();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 173b8a6..b7a0eb8 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -86,26 +86,6 @@ public class TestZkUtils {
zkServer.teardown();
}
-
- @Test
- public void testInitZkPath() {
- String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1";
- ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
- Assert.assertTrue(zkClient.exists("/samza1"));
-
- zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2";
- ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
- Assert.assertTrue(zkClient.exists("/samza1/samza2"));
-
-
- zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path.
- ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
- Assert.assertTrue(zkClient.exists("/"));
- }
-
@Test
public void testRegisterProcessorId() {
String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));