You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/23 23:54:10 UTC
kafka git commit: KAFKA-3245: config for changelog replication factor
Repository: kafka
Updated Branches:
refs/heads/trunk 525f4c81d -> 878b78acb
KAFKA-3245: config for changelog replication factor
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #948 from ymatsuda/changelog_topic_replication
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/878b78ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/878b78ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/878b78ac
Branch: refs/heads/trunk
Commit: 878b78acb68d0fa62eb3c8f21d572664ca2bcdd0
Parents: 525f4c8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 23 14:54:06 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 23 14:54:06 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 10 +
.../internals/InternalTopicManager.java | 228 +++++++++++++++++++
.../internals/StreamPartitionAssignor.java | 178 +--------------
3 files changed, 242 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index cf0684a..b835948 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -86,6 +86,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String JOB_ID_CONFIG = "job.id";
public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+ /** <code>replication.factor</code> */
+ public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+ public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job.";
+
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -139,6 +143,11 @@ public class StreamsConfig extends AbstractConfig {
"/tmp/kafka-streams",
Importance.MEDIUM,
STATE_DIR_DOC)
+ .define(REPLICATION_FACTOR_CONFIG,
+ Type.INT,
+ 1,
+ Importance.MEDIUM,
+ REPLICATION_FACTOR_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value
Type.CLASS,
Importance.HIGH,
@@ -284,6 +293,7 @@ public class StreamsConfig extends AbstractConfig {
private void removeStreamsSpecificConfigs(Map<String, Object> props) {
props.remove(StreamsConfig.JOB_ID_CONFIG);
+ props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
props.remove(StreamsConfig.STATE_DIR_CONFIG);
props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
new file mode 100644
index 0000000..3768260
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalTopicManager {
+
+ private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
+
+ // TODO: the following ZK dependency should be removed after KIP-4
+ private static final String ZK_TOPIC_PATH = "/brokers/topics";
+ private static final String ZK_BROKER_PATH = "/brokers/ids";
+ private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+ private final ZkClient zkClient;
+
+ private class ZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ try {
+ if (bytes == null)
+ return null;
+ else
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ public InternalTopicManager(String zkConnect) {
+ zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ }
+
+ public void makeReady(String topic, int numPartitions, int replicationFactor) {
+ boolean topicNotReady = true;
+
+ while (topicNotReady) {
+ Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+ if (topicMetadata == null) {
+ try {
+ createTopic(topic, numPartitions, replicationFactor);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else {
+ if (topicMetadata.size() > numPartitions) {
+ // else if topic exists with more #.partitions than needed, delete in order to re-create it
+ try {
+ deleteTopic(topic);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else if (topicMetadata.size() < numPartitions) {
+ // else if topic exists with less #.partitions than needed, add partitions
+ try {
+ addPartitions(topic, numPartitions - topicMetadata.size(), replicationFactor, topicMetadata);
+ } catch (ZkNoNodeException e) {
+ // ignore and continue
+ }
+ } else {
+ topicNotReady = false;
+ }
+ }
+ }
+ }
+
+ private List<Integer> getBrokers() {
+ List<Integer> brokers = new ArrayList<>();
+ for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+ brokers.add(Integer.parseInt(broker));
+ }
+ Collections.sort(brokers);
+
+ log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+ return brokers;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+ String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+ if (data == null) return null;
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+ });
+
+ Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+ log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+ return partitions;
+ } catch (IOException e) {
+ throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
+ }
+ }
+
+ public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
+ log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+ List<Integer> brokers = getBrokers();
+ int numBrokers = brokers.size();
+ if (numBrokers < replicationFactor) {
+ log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " + numBrokers);
+ replicationFactor = numBrokers;
+ }
+
+ Map<Integer, List<Integer>> assignment = new HashMap<>();
+
+ for (int i = 0; i < numPartitions; i++) {
+ ArrayList<Integer> brokerList = new ArrayList<>();
+ for (int r = 0; r < replicationFactor; r++) {
+ int shift = r * numBrokers / replicationFactor;
+ brokerList.add(brokers.get((i + shift) % numBrokers));
+ }
+ assignment.put(i, brokerList);
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", assignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ } catch (JsonProcessingException e) {
+ throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
+ }
+ }
+
+ public void deleteTopic(String topic) throws ZkNodeExistsException {
+ log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+ zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
+ log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+ List<Integer> brokers = getBrokers();
+ int numBrokers = brokers.size();
+ if (numBrokers < replicationFactor) {
+ log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " + numBrokers);
+ replicationFactor = numBrokers;
+ }
+
+ int startIndex = existingAssignment.size();
+
+ Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+ for (int i = 0; i < numPartitions; i++) {
+ ArrayList<Integer> brokerList = new ArrayList<>();
+ for (int r = 0; r < replicationFactor; r++) {
+ int shift = r * numBrokers / replicationFactor;
+ brokerList.add(brokers.get((i + shift) % numBrokers));
+ }
+ newAssignment.put(i + startIndex, brokerList);
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", newAssignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+ } catch (JsonProcessingException e) {
+ throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e600cf7..f49601c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
@@ -35,19 +34,6 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -100,132 +86,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
- // TODO: the following ZK dependency should be removed after KIP-4
- private static final String ZK_TOPIC_PATH = "/brokers/topics";
- private static final String ZK_BROKER_PATH = "/brokers/ids";
- private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
- private ZkClient zkClient;
-
- private class ZKStringSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) {
- try {
- return ((String) data).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) {
- try {
- if (bytes == null)
- return null;
- else
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
- }
-
- private List<Integer> getBrokers() {
- List<Integer> brokers = new ArrayList<>();
- for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
- brokers.add(Integer.parseInt(broker));
- }
- Collections.sort(brokers);
-
- log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
- return brokers;
- }
-
- @SuppressWarnings("unchecked")
- private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
- String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
- if (data == null) return null;
-
- try {
- ObjectMapper mapper = new ObjectMapper();
-
- Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
- });
-
- Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
- log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
- return partitions;
- } catch (IOException e) {
- throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
- }
- }
-
- private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
- log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
- // we always assign leaders to brokers starting at the first one with replication factor 1
- List<Integer> brokers = getBrokers();
-
- Map<Integer, List<Integer>> assignment = new HashMap<>();
- for (int i = 0; i < numPartitions; i++) {
- assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", assignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- } catch (JsonProcessingException e) {
- throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
- }
- }
-
- private void deleteTopic(String topic) throws ZkNodeExistsException {
- log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
- zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
- log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
- // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
- List<Integer> brokers = getBrokers();
-
- int startIndex = existingAssignment.size();
-
- Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
- for (int i = 0; i < numPartitions; i++) {
- newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", newAssignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
- } catch (JsonProcessingException e) {
- throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
- }
- }
+ private InternalTopicManager internalTopicManager;
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
@@ -255,7 +116,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
this.topicGroups = streamThread.builder.topicGroups();
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
- zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ internalTopicManager = new InternalTopicManager((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG));
}
@Override
@@ -411,7 +272,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
// if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
- if (zkClient != null) {
+ if (internalTopicManager != null) {
log.debug("Starting to validate changelog topics in partition assignor.");
Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
@@ -428,38 +289,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
numPartitions = task.partition + 1;
}
- boolean topicNotReady = true;
-
- while (topicNotReady) {
- Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
- // if topic does not exist, create it
- if (topicMetadata == null) {
- try {
- createTopic(topic, numPartitions);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else {
- if (topicMetadata.size() > numPartitions) {
- // else if topic exists with more #.partitions than needed, delete in order to re-create it
- try {
- deleteTopic(topic);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else if (topicMetadata.size() < numPartitions) {
- // else if topic exists with less #.partitions than needed, add partitions
- try {
- addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
- } catch (ZkNoNodeException e) {
- // ignore and continue
- }
- }
-
- topicNotReady = false;
- }
- }
+ internalTopicManager.makeReady(topic, numPartitions, 1);
// wait until the topic metadata has been propagated to all brokers
List<PartitionInfo> partitions;