You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/11/17 14:47:59 UTC
metron git commit: METRON-1291 Kafka produce REST endpoint does not
work in a Kerberized cluster (merrimanr) closes apache/metron#826
Repository: metron
Updated Branches:
refs/heads/master a6b5eddd1 -> fd4a6d164
METRON-1291 Kafka produce REST endpoint does not work in a Kerberized cluster (merrimanr) closes apache/metron#826
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd4a6d16
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd4a6d16
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd4a6d16
Branch: refs/heads/master
Commit: fd4a6d16407476366afe339342c2ed0d6d88faf9
Parents: a6b5edd
Author: merrimanr <me...@gmail.com>
Authored: Fri Nov 17 08:47:46 2017 -0600
Committer: merrimanr <me...@apache.org>
Committed: Fri Nov 17 08:47:46 2017 -0600
----------------------------------------------------------------------
.../apache/metron/rest/config/KafkaConfig.java | 5 +-
.../src/main/resources/application.yml | 2 +
.../metron/rest/config/KafkaConfigTest.java | 75 ++++++++++++++++++++
3 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index a2abbeb..a15c48f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -86,7 +86,7 @@ public class KafkaConfig {
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
- props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
}
return props;
}
@@ -108,6 +108,9 @@ public class KafkaConfig {
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("request.required.acks", 1);
+ if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
+ producerConfig.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+ }
return producerConfig;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml
index 452a91f..426effa 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -35,6 +35,8 @@ zookeeper:
connection: 10000
kafka:
+ security:
+ protocol: SASL_PLAINTEXT
topics:
escalation: escalation
http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
new file mode 100644
index 0000000..dab924f
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import org.apache.metron.rest.MetronRestConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.core.env.Environment;
+
+public class KafkaConfigTest {
+
+ private Environment environment;
+ private KafkaConfig kafkaConfig;
+
+ @Before
+ public void setUp() throws Exception {
+ environment = mock(Environment.class);
+ kafkaConfig = new KafkaConfig(environment);
+ }
+
+ @Test
+ public void kafkaConfigShouldProperlyReturnConsumerProperties() throws Exception {
+ when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls");
+ when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
+
+ Map<String, Object> consumerProperties = kafkaConfig.consumerProperties();
+ assertEquals("broker urls", consumerProperties.get("bootstrap.servers"));
+ assertNull(consumerProperties.get("security.protocol"));
+
+ when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true);
+ when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol");
+
+ consumerProperties = kafkaConfig.consumerProperties();
+ assertEquals("kafka security protocol", consumerProperties.get("security.protocol"));
+ }
+
+ @Test
+ public void kafkaConfigShouldProperlyReturnProducerProperties() throws Exception {
+ when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls");
+ when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
+
+ Map<String, Object> producerProperties = kafkaConfig.producerProperties();
+ assertEquals("broker urls", producerProperties.get("bootstrap.servers"));
+ assertNull(producerProperties.get("security.protocol"));
+
+ when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true);
+ when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol");
+
+ producerProperties = kafkaConfig.consumerProperties();
+ assertEquals("kafka security protocol", producerProperties.get("security.protocol"));
+ }
+
+
+}