You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/11/17 14:47:59 UTC

metron git commit: METRON-1291 Kafka produce REST endpoint does not work in a Kerberized cluster (merrimanr) closes apache/metron#826

Repository: metron
Updated Branches:
  refs/heads/master a6b5eddd1 -> fd4a6d164


METRON-1291 Kafka produce REST endpoint does not work in a Kerberized cluster (merrimanr) closes apache/metron#826


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd4a6d16
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd4a6d16
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd4a6d16

Branch: refs/heads/master
Commit: fd4a6d16407476366afe339342c2ed0d6d88faf9
Parents: a6b5edd
Author: merrimanr <me...@gmail.com>
Authored: Fri Nov 17 08:47:46 2017 -0600
Committer: merrimanr <me...@apache.org>
Committed: Fri Nov 17 08:47:46 2017 -0600

----------------------------------------------------------------------
 .../apache/metron/rest/config/KafkaConfig.java  |  5 +-
 .../src/main/resources/application.yml          |  2 +
 .../metron/rest/config/KafkaConfigTest.java     | 75 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index a2abbeb..a15c48f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -86,7 +86,7 @@ public class KafkaConfig {
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
-      props.put("security.protocol", "SASL_PLAINTEXT");
+      props.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
     }
     return props;
   }
@@ -108,6 +108,9 @@ public class KafkaConfig {
     producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     producerConfig.put("request.required.acks", 1);
+    if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
+      producerConfig.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+    }
     return producerConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml
index 452a91f..426effa 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -35,6 +35,8 @@ zookeeper:
       connection: 10000
 
 kafka:
+  security:
+    protocol: SASL_PLAINTEXT
   topics:
     escalation: escalation
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
new file mode 100644
index 0000000..dab924f
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import org.apache.metron.rest.MetronRestConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.core.env.Environment;
+
+public class KafkaConfigTest {
+
+  private Environment environment;
+  private KafkaConfig kafkaConfig;
+
+  @Before
+  public void setUp() throws Exception {
+    environment = mock(Environment.class);
+    kafkaConfig = new KafkaConfig(environment);
+  }
+
+  @Test
+  public void kafkaConfigShouldProperlyReturnConsumerProperties() throws Exception {
+    when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls");
+    when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
+
+    Map<String, Object> consumerProperties = kafkaConfig.consumerProperties();
+    assertEquals("broker urls", consumerProperties.get("bootstrap.servers"));
+    assertNull(consumerProperties.get("security.protocol"));
+
+    when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true);
+    when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol");
+
+    consumerProperties = kafkaConfig.consumerProperties();
+    assertEquals("kafka security protocol", consumerProperties.get("security.protocol"));
+  }
+
+  @Test
+  public void kafkaConfigShouldProperlyReturnProducerProperties() throws Exception {
+    when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls");
+    when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
+
+    Map<String, Object> producerProperties = kafkaConfig.producerProperties();
+    assertEquals("broker urls", producerProperties.get("bootstrap.servers"));
+    assertNull(producerProperties.get("security.protocol"));
+
+    when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true);
+    when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol");
+
+    producerProperties = kafkaConfig.consumerProperties();
+    assertEquals("kafka security protocol", producerProperties.get("security.protocol"));
+  }
+
+
+}