You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/09 22:58:24 UTC
samza git commit: SAMZA-1275: Kafka throws when users configure replication.factor for …
Repository: samza
Updated Branches:
refs/heads/master 279a2952d -> b815e6d91
SAMZA-1275: Kafka throws when users configure replication.factor for …
…Kafka default stream
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Xinyu Liu <xi...@linkedin.com>
Closes #176 from jmakes/samza-1275
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b815e6d9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b815e6d9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b815e6d9
Branch: refs/heads/master
Commit: b815e6d91c45171b98e8cda76bf25cf95967e285
Parents: 279a295
Author: Jacob Maes <jm...@linkedin.com>
Authored: Tue May 9 15:58:14 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Tue May 9 15:58:14 2017 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaStreamSpec.java | 33 ++++++++++-
.../samza/system/kafka/TestKafkaStreamSpec.java | 60 ++++++++++++++++++++
2 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b815e6d9/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 3255f70..8cf4923 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -22,14 +22,19 @@ package org.apache.samza.system.kafka;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import kafka.log.LogConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Extends StreamSpec with the ability to easily get the topic replication factor.
*/
public class KafkaStreamSpec extends StreamSpec {
+ private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
+
private static final int DEFAULT_REPLICATION_FACTOR = 2;
/**
@@ -62,6 +67,30 @@ public class KafkaStreamSpec extends StreamSpec {
}
/**
+ * Filter out properties from the original config that are not supported by Kafka.
+ * For example, we allow users to set replication.factor as a property of the streams
+ * and then parse it out so we can pass it separately as Kafka requires. But Kafka
+ * will also throw if replication.factor is passed as a property on a new topic.
+ *
+ * @param originalConfig The original config to filter
+ * @return The filtered config
+ */
+ private static Map<String, String> filterUnsupportedProperties(Map<String, String> originalConfig) {
+ Map<String, String> filteredConfig = new HashMap<>();
+ for (Map.Entry<String, String> entry: originalConfig.entrySet()) {
+ // Kafka requires replication factor, but not as a property, so we have to filter it out.
+ if (!KafkaConfig.TOPIC_REPLICATION_FACTOR().equals(entry.getKey())) {
+ if (LogConfig.configNames().contains(entry.getKey())) {
+ filteredConfig.put(entry.getKey(), entry.getValue());
+ } else {
+ LOG.warn("Property '{}' is not a valid Kafka topic config. It will be ignored.");
+ }
+ }
+ }
+ return filteredConfig;
+ }
+
+ /**
* Converts any StreamSpec to a KafkaStreamSpec.
* If the original spec already is a KafkaStreamSpec, it is simply returned.
*
@@ -81,7 +110,7 @@ public class KafkaStreamSpec extends StreamSpec {
originalSpec.getSystemName(),
originalSpec.getPartitionCount(),
replicationFactor,
- mapToProperties(originalSpec.getConfig()));
+ mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
}
/**
@@ -109,7 +138,7 @@ public class KafkaStreamSpec extends StreamSpec {
* @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the
* Samza System abstraction. See {@link org.apache.samza.system.SystemFactory}
*
- * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
+ * @param partitionCount The number of partitions for the stream. A value of {@code 1} indicates unpartitioned.
*
* @param replicationFactor The number of topic replicas in the Kafka cluster for durability.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/b815e6d9/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
new file mode 100644
index 0000000..69345a3
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * See also the general StreamSpec tests in {@link org.apache.samza.runtime.TestAbstractApplicationRunner}
+ */
+public class TestKafkaStreamSpec {
+
+ @Test
+ public void testUnsupportedConfigStrippedFromProperties() {
+ StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+
+ // First verify the original
+ assertEquals("7", original.get("replication.factor"));
+ assertEquals("4", original.get("segment.bytes"));
+
+ Map<String, String> config = original.getConfig();
+ assertEquals("7", config.get("replication.factor"));
+ assertEquals("4", config.get("segment.bytes"));
+
+
+ // Now verify the Kafka spec
+ KafkaStreamSpec spec = KafkaStreamSpec.fromSpec(original);
+ assertNull(spec.get("replication.factor"));
+ assertEquals("4", spec.get("segment.bytes"));
+
+ Properties kafkaProperties = spec.getProperties();
+ Map<String, String> kafkaConfig = spec.getConfig();
+ assertNull(kafkaProperties.get("replication.factor"));
+ assertEquals("4", kafkaProperties.get("segment.bytes"));
+
+ assertNull(kafkaConfig.get("replication.factor"));
+ assertEquals("4", kafkaConfig.get("segment.bytes"));
+ }
+}