You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/12 17:51:35 UTC

samza git commit: SAMZA-1327: fail if namespace specified in the connection string does not exist

Repository: samza
Updated Branches:
  refs/heads/master 371473dd1 -> 1eb8fd472


SAMZA-1327: fail if namespace specified in the connection string does not exist

Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #218 from sborya/zkNameSpaceFail


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb8fd47
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb8fd47
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb8fd47

Branch: refs/heads/master
Commit: 1eb8fd472458ad1ba03ac4e95627203bc7b7a12f
Parents: 371473d
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon Jun 12 10:51:27 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Jun 12 10:51:27 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkCoordinationServiceFactory.java  |  39 ++---
 .../org/apache/samza/zk/TestZkNamespace.java    | 153 +++++++++++++++++++
 .../java/org/apache/samza/zk/TestZkUtils.java   |  20 ---
 3 files changed, 174 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index b3a2a6f..20fcfa4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory;
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
   private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
 
-  // TODO - Why should this method be synchronized?
-  synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+  public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
 
-    ZkClient zkClient = createZkClient(zkConfig.getZkConnect(),
-        zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-
-    // make sure the 'path' exists
-    createZkPath(zkConfig.getZkConnect(), zkClient);
+    ZkClient zkClient =
+        createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
     ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
 
@@ -56,31 +52,38 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
    * @return zkClient object
    */
   public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) {
+    ZkClient zkClient;
     try {
-      return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
+      zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
     } catch (Exception e) {
       // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based.
       throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e);
     }
+
+    // make sure the namespace in zk exists (if specified)
+    validateZkNameSpace(connectString, zkClient);
+
+    return zkClient;
   }
 
   /**
-   * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time.
+   * if ZkConnectString contains namespace path at the end, but it does not exist we should fail
    * @param zkConnect - connect string
    * @param zkClient - zkClient object to talk to the ZK
    */
-  public static void createZkPath(String zkConnect, ZkClient zkClient) {
+  public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) {
     ConnectStringParser parser = new ConnectStringParser(zkConnect);
 
     String path = parser.getChrootPath();
-    LOG.info("path =" + path);
-    if (!Strings.isNullOrEmpty(path)) {
-      // create this path in zk
-      LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses());
-      if (!zkClient.exists(path)) {
-        zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists
-      }
+    if (Strings.isNullOrEmpty(path)) {
+      return; // no namespace path
     }
-  }
 
+    LOG.info("connectString = " + zkConnect + "; path =" + path);
+
+    // if namespace specified (path above) but "/" does not exists, we will fail
+    if (!zkClient.exists("/")) {
+      throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
new file mode 100644
index 0000000..3ce203e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.base.Strings;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.SamzaException;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+// zk namespace is similar to chroot in unix. It is defined in ZK, but user doesn't see it.
+// For user "/" is the root, but in ZK tree it is actually host:port/namespace. If namespace is not created, then accessing
+// "/" by user will fail.
+public class TestZkNamespace {
+  private static EmbeddedZookeeper zkServer = null;
+  private ZkClient zkClient = null;
+  private ZkClient zkClient1 = null;
+  private static final int SESSION_TIMEOUT_MS = 20000;
+  private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+  @BeforeClass
+  public static void setup()
+      throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  // for these tests we need to connect to zk multiple times
+  private void initZk(String zkConnect) {
+    try {
+      zkClient = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS);
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed for connect + " + zkConnect + ": " + e);
+    }
+  }
+
+  private void tearDownZk() {
+    if (zkClient != null) {
+      zkClient.close();
+    }
+
+    if (zkClient1 != null) {
+      zkClient1.close();
+    }
+  }
+
+  // create namespace for zk before accessing it, thus using a separate client
+  private void createNamespace(String pathToCreate) {
+    if (Strings.isNullOrEmpty(pathToCreate)) {
+      return;
+    }
+
+    String zkConnect = "127.0.0.1:" + zkServer.getPort();
+    try {
+      zkClient1 = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS);
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed. Aborting tests..");
+    }
+    zkClient1.createPersistent(pathToCreate, true);
+  }
+
+  // create namespace, create connection, validate the connection
+  private void testDoNotFailIfNameSpacePresent(String zkNameSpace) {
+    String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace;
+    createNamespace(zkNameSpace);
+    initZk(zkConnect);
+    ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+
+    zkClient.createPersistent("/test");
+    zkClient.createPersistent("/test/test1");
+
+    // test if the new root exists
+    Assert.assertTrue(zkClient.exists("/"));
+    Assert.assertTrue(zkClient.exists("/test"));
+    Assert.assertTrue(zkClient.exists("/test/test1"));
+  }
+
+  @Test
+  public void testValidateFailZkNameSpace1LevelPath() {
+    try {
+      String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace";
+      initZk(zkConnect);
+      ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+      Assert.fail("1.Should fail with exception, because namespace doesn't exist");
+    } catch (SamzaException e) {
+      // expected
+    } finally {
+      tearDownZk();
+    }
+  }
+
+  @Test
+  public void testValidateFailZkNameSpace2LevelPath() {
+    try {
+      String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz";
+      initZk(zkConnect);
+      ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+      Assert.fail("2.Should fail with exception, because namespace doesn't exist");
+    } catch (SamzaException e) {
+      // expected
+    } finally {
+      tearDownZk();
+    }
+  }
+
+  @Test
+  public void testValidateFailZkNameSpaceEmptyPath() {
+    // should succeed, because no namespace provided
+    String zkConnect = "127.0.0.1:" + zkServer.getPort() + "";
+    initZk(zkConnect);
+    ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient);
+    tearDownZk();
+  }
+
+  @Test
+  public void testValidateNotFailZkNameSpace() {
+    // now positive tests - with existing namespace
+    testDoNotFailIfNameSpacePresent("/zkNameSpace1");
+
+    testDoNotFailIfNameSpacePresent("/zkNameSpace1/xyz1");
+
+    testDoNotFailIfNameSpacePresent("");
+
+    tearDownZk();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 173b8a6..b7a0eb8 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -86,26 +86,6 @@ public class TestZkUtils {
     zkServer.teardown();
   }
 
-
-  @Test
-  public void testInitZkPath() {
-    String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1";
-    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
-    Assert.assertTrue(zkClient.exists("/samza1"));
-
-    zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2";
-    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
-    Assert.assertTrue(zkClient.exists("/samza1/samza2"));
-
-
-    zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path.
-    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
-
-    Assert.assertTrue(zkClient.exists("/"));
-  }
-
   @Test
   public void testRegisterProcessorId() {
     String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));