You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by le...@apache.org on 2017/06/08 12:48:15 UTC
metron git commit: METRON-976 KafkaUtils doesn't handle
SASL_PLAINTEXT (justinleet via leet) closes apache/metron#600
Repository: metron
Updated Branches:
refs/heads/master 5b72da7be -> 61cbab46e
METRON-976 KafkaUtils doesn't handle SASL_PLAINTEXT (justinleet via leet) closes apache/metron#600
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/61cbab46
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/61cbab46
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/61cbab46
Branch: refs/heads/master
Commit: 61cbab46e3e262e006e59c28eaa9192d0f7aaaab
Parents: 5b72da7
Author: justinleet <ju...@gmail.com>
Authored: Thu Jun 8 08:47:27 2017 -0400
Committer: YOUR NAME as In Apache <YO...@apache.org>
Committed: Thu Jun 8 08:47:27 2017 -0400
----------------------------------------------------------------------
.../apache/metron/common/utils/KafkaUtils.java | 15 +-
.../common/utils/KafkaUtilsEndpointTest.java | 64 --------
.../metron/common/utils/KafkaUtilsTest.java | 162 +++++++++++++++++++
3 files changed, 173 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index 04c1389..bbd8b30 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -44,6 +44,7 @@ public enum KafkaUtils {
framework.close();
}
}
+
public List<String> getBrokersFromZookeeper(CuratorFramework client) throws Exception {
List<String> ret = new ArrayList<>();
for(String id : client.getChildren().forPath("/brokers/ids")) {
@@ -68,12 +69,18 @@ public enum KafkaUtils {
return ret;
}
- public List<String> fromEndpoint(String url) throws URISyntaxException {
+ /*
+ The URL accepted is NOT a general URL, and is assumed to follow the format used by the Kafka structures in Zookeeper.
+ See: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
+ */
+ List<String> fromEndpoint(String url){
List<String> ret = new ArrayList<>();
if(url != null) {
- URI uri = new URI(url);
- int port = uri.getPort();
- ret.add(uri.getHost() + ((port > 0)?(":" + port):""));
+ Iterable<String> splits = Splitter.on("//").split(url);
+ if(Iterables.size(splits) == 2) {
+ String hostPort = Iterables.getLast(splits);
+ ret.add(hostPort);
+ }
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
deleted file mode 100644
index 14a9e41..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class KafkaUtilsEndpointTest {
- static String[] hostnames = new String[] { "node1", "localhost", "192.168.0.1", "my.domain.com" };
- static String[] schemes = new String[] { "SSL", "PLAINTEXTSASL", "PLAINTEXT"};
- static String[] ports = new String[] { "6667", "9091", null};
- private String endpoint;
- private String expected;
-
- public KafkaUtilsEndpointTest(String endpoint, String expected) {
- this.endpoint = endpoint;
- this.expected = expected;
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- List<Object[]> ret = new ArrayList<>();
- for(String scheme : schemes) {
- for(String hostname : hostnames) {
- for(String port : ports) {
- port = port != null?(":" + port):"";
- String expected = hostname + port;
- ret.add(new Object[]{scheme + "://" + expected, expected });
- }
- }
- }
- return ret;
- }
-
- @Test
- public void testEndpointParsing() throws URISyntaxException {
- Assert.assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
new file mode 100644
index 0000000..72ac51e
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(Enclosed.class)
+public class KafkaUtilsTest {
+ @RunWith(MockitoJUnitRunner.class)
+ public static class ZkMockedUtils {
+ @Mock
+ CuratorFramework client;
+ @Mock
+ GetChildrenBuilder childrenBuilder;
+ @Mock
+ GetDataBuilder dataBuilder;
+
+ /**
+ * {
+ * "host": "192.168.1.148",
+ * "port": 9092
+ * }
+ */
+ @Multiline
+ public static String brokerWithHostPort;
+
+ @Test
+ public void testGetEndpointsFromZookeeperHostPort() throws Exception {
+ ArrayList<String> brokerIds = new ArrayList<>();
+ brokerIds.add("1");
+
+ when(client.getChildren()).thenReturn(childrenBuilder);
+ when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+ when(client.getData()).thenReturn(dataBuilder);
+ when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithHostPort.getBytes());
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("192.168.1.148:9092");
+ assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+ }
+
+ /**
+ * {
+ * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093", "SASL_PLAINTEXT://host1:9094", "PLAINTEXTSASL://host1:9095"]
+ * }
+ */
+ @Multiline
+ public static String brokerWithEndpoints;
+
+ @Test
+ public void testGetEndpointsFromZookeeperEndpoints() throws Exception {
+ ArrayList<String> brokerIds = new ArrayList<>();
+ brokerIds.add("1");
+
+ when(client.getChildren()).thenReturn(childrenBuilder);
+ when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+ when(client.getData()).thenReturn(dataBuilder);
+ when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithEndpoints.getBytes());
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("host1:9092");
+ expected.add("host1:9093");
+ expected.add("host1:9094");
+ expected.add("host1:9095");
+ assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+ }
+
+ /**
+ * {
+ * "host": "192.168.1.148",
+ * "port": 9092,
+ * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+ * }
+ */
+ @Multiline
+ public static String brokerWithHostPortAndEndpoints;
+
+ @Test
+ public void testGetEndpointsFromZookeeperHostPortAndEndpoints() throws Exception {
+ ArrayList<String> brokerIds = new ArrayList<>();
+ brokerIds.add("1");
+
+ when(client.getChildren()).thenReturn(childrenBuilder);
+ when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds);
+ when(client.getData()).thenReturn(dataBuilder);
+ when(dataBuilder.forPath("/brokers/ids/1"))
+ .thenReturn(brokerWithHostPortAndEndpoints.getBytes());
+
+ ArrayList<String> expected = new ArrayList<>();
+ expected.add("192.168.1.148:9092");
+ assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client)));
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class ParameterizedEndPointParsing {
+ static String[] hostnames = new String[]{"node1", "localhost", "192.168.0.1", "my.domain.com"};
+ static String[] schemes = new String[]{"SSL", "PLAINTEXTSASL", "PLAINTEXT", "SASL_PLAINTEXT"};
+ static String[] ports = new String[]{"6667", "9091", null};
+
+ private String endpoint;
+ private String expected;
+
+ public ParameterizedEndPointParsing(String endpoint, String expected) {
+ this.endpoint = endpoint;
+ this.expected = expected;
+ }
+
+ @Parameters(name = "{index}:endpoint({0}={1})")
+ public static Collection<Object[]> data() {
+ List<Object[]> ret = new ArrayList<>();
+ for (String scheme : schemes) {
+ for (String hostname : hostnames) {
+ for (String port : ports) {
+ port = port != null ? (":" + port) : "";
+ String expected = hostname + port;
+ ret.add(new Object[]{scheme + "://" + expected, expected});
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Test
+ public void testEndpointParsing() throws URISyntaxException {
+ assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
+ }
+ }
+}