You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/26 17:27:32 UTC

[11/18] metron git commit: METRON-976 KafkaUtils doesn't handle SASL_PLAINTEXT (justinleet via leet) closes apache/metron#600

METRON-976 KafkaUtils doesn't handle SASL_PLAINTEXT (justinleet via leet) closes apache/metron#600


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/61cbab46
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/61cbab46
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/61cbab46

Branch: refs/heads/Metron_0.4.0
Commit: 61cbab46e3e262e006e59c28eaa9192d0f7aaaab
Parents: 5b72da7
Author: justinleet <ju...@gmail.com>
Authored: Thu Jun 8 08:47:27 2017 -0400
Committer: YOUR NAME as In Apache <YO...@apache.org>
Committed: Thu Jun 8 08:47:27 2017 -0400

----------------------------------------------------------------------
 .../apache/metron/common/utils/KafkaUtils.java  |  15 +-
 .../common/utils/KafkaUtilsEndpointTest.java    |  64 --------
 .../metron/common/utils/KafkaUtilsTest.java     | 162 +++++++++++++++++++
 3 files changed, 173 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index 04c1389..bbd8b30 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -44,6 +44,7 @@ public enum KafkaUtils {
       framework.close();
     }
   }
+
   public List<String> getBrokersFromZookeeper(CuratorFramework client) throws Exception {
     List<String> ret = new ArrayList<>();
     for(String id : client.getChildren().forPath("/brokers/ids")) {
@@ -68,12 +69,18 @@ public enum KafkaUtils {
     return ret;
   }
 
-  public List<String> fromEndpoint(String url) throws URISyntaxException {
+  /*
+  The URL accepted is NOT a general URL, and is assumed to follow the format used by the Kafka structures in Zookeeper.
+  See: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
+   */
+  List<String> fromEndpoint(String url){
     List<String> ret = new ArrayList<>();
     if(url != null) {
-      URI uri = new URI(url);
-      int port = uri.getPort();
-      ret.add(uri.getHost() + ((port > 0)?(":" + port):""));
+      Iterable<String> splits = Splitter.on("//").split(url);
+      if(Iterables.size(splits) == 2) {
+        String hostPort = Iterables.getLast(splits);
+        ret.add(hostPort);
+      }
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
deleted file mode 100644
index 14a9e41..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class KafkaUtilsEndpointTest {
-  static String[] hostnames = new String[] { "node1", "localhost", "192.168.0.1", "my.domain.com" };
-  static String[] schemes = new String[] { "SSL", "PLAINTEXTSASL", "PLAINTEXT"};
-  static String[] ports = new String[] { "6667", "9091", null};
-  private String endpoint;
-  private String expected;
-
-  public KafkaUtilsEndpointTest(String endpoint, String expected) {
-    this.endpoint = endpoint;
-    this.expected = expected;
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    List<Object[]> ret = new ArrayList<>();
-    for(String scheme : schemes) {
-      for(String hostname : hostnames) {
-        for(String port : ports) {
-          port = port != null?(":" + port):"";
-          String expected = hostname + port;
-          ret.add(new Object[]{scheme + "://" + expected, expected });
-        }
-      }
-    }
-    return ret;
-  }
-
-  @Test
-  public void testEndpointParsing() throws URISyntaxException {
-    Assert.assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
new file mode 100644
index 0000000..72ac51e
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(Enclosed.class)
+public class KafkaUtilsTest {
+  @RunWith(MockitoJUnitRunner.class)
+  public static class ZkMockedUtils {
+    @Mock
+    CuratorFramework client;
+    @Mock
+    GetChildrenBuilder childrenBuilder;
+    @Mock
+    GetDataBuilder dataBuilder;
+
+    /**
+     * {
+     * "host": "192.168.1.148",
+     * "port": 9092
+     * }
+     */
+    @Multiline
+    public static String brokerWithHostPort;
+
+    @Test
+    public void testGetEndpointsFromZookeeperHostPort() throws Exception {
+      ArrayList<String> brokerIds = new ArrayList<>();
+      brokerIds.add("1");
+
+      when(client.getChildren()).thenReturn(childrenBuilder);
+      when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+      when(client.getData()).thenReturn(dataBuilder);
+      when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithHostPort.getBytes());
+
+      ArrayList<String> expected = new ArrayList<>();
+      expected.add("192.168.1.148:9092");
+      assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+    }
+
+    /**
+     * {
+     * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093", "SASL_PLAINTEXT://host1:9094", "PLAINTEXTSASL://host1:9095"]
+     * }
+     */
+    @Multiline
+    public static String brokerWithEndpoints;
+
+    @Test
+    public void testGetEndpointsFromZookeeperEndpoints() throws Exception {
+      ArrayList<String> brokerIds = new ArrayList<>();
+      brokerIds.add("1");
+
+      when(client.getChildren()).thenReturn(childrenBuilder);
+      when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+      when(client.getData()).thenReturn(dataBuilder);
+      when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithEndpoints.getBytes());
+
+      ArrayList<String> expected = new ArrayList<>();
+      expected.add("host1:9092");
+      expected.add("host1:9093");
+      expected.add("host1:9094");
+      expected.add("host1:9095");
+      assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+    }
+
+    /**
+     * {
+     * "host": "192.168.1.148",
+     * "port": 9092,
+     * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+     * }
+     */
+    @Multiline
+    public static String brokerWithHostPortAndEndpoints;
+
+    @Test
+    public void testGetEndpointsFromZookeeperHostPortAndEndpoints() throws Exception {
+      ArrayList<String> brokerIds = new ArrayList<>();
+      brokerIds.add("1");
+
+      when(client.getChildren()).thenReturn(childrenBuilder);
+      when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+      when(client.getData()).thenReturn(dataBuilder);
+      when(dataBuilder.forPath("/brokers/ids/1"))
+          .thenReturn(brokerWithHostPortAndEndpoints.getBytes());
+
+      ArrayList<String> expected = new ArrayList<>();
+      expected.add("192.168.1.148:9092");
+      assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class ParameterizedEndPointParsing {
+    static String[] hostnames = new String[]{"node1", "localhost", "192.168.0.1", "my.domain.com"};
+    static String[] schemes = new String[]{"SSL", "PLAINTEXTSASL", "PLAINTEXT", "SASL_PLAINTEXT"};
+    static String[] ports = new String[]{"6667", "9091", null};
+
+    private String endpoint;
+    private String expected;
+
+    public ParameterizedEndPointParsing(String endpoint, String expected) {
+      this.endpoint = endpoint;
+      this.expected = expected;
+    }
+
+    @Parameters(name = "{index}:endpoint({0}={1})")
+    public static Collection<Object[]> data() {
+      List<Object[]> ret = new ArrayList<>();
+      for (String scheme : schemes) {
+        for (String hostname : hostnames) {
+          for (String port : ports) {
+            port = port != null ? (":" + port) : "";
+            String expected = hostname + port;
+            ret.add(new Object[]{scheme + "://" + expected, expected});
+          }
+        }
+      }
+      return ret;
+    }
+
+    @Test
+    public void testEndpointParsing() throws URISyntaxException {
+      assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
+    }
+  }
+}