You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/23 23:54:10 UTC

kafka git commit: KAFKA-3245: config for changelog replication factor

Repository: kafka
Updated Branches:
  refs/heads/trunk 525f4c81d -> 878b78acb


KAFKA-3245: config for changelog replication factor

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #948 from ymatsuda/changelog_topic_replication


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/878b78ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/878b78ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/878b78ac

Branch: refs/heads/trunk
Commit: 878b78acb68d0fa62eb3c8f21d572664ca2bcdd0
Parents: 525f4c8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 23 14:54:06 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 23 14:54:06 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |  10 +
 .../internals/InternalTopicManager.java         | 228 +++++++++++++++++++
 .../internals/StreamPartitionAssignor.java      | 178 +--------------
 3 files changed, 242 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index cf0684a..b835948 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -86,6 +86,10 @@ public class StreamsConfig extends AbstractConfig {
     public static final String JOB_ID_CONFIG = "job.id";
     public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
+    /** <code>replication.factor</code> */
+    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+    public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job.";
+
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 
@@ -139,6 +143,11 @@ public class StreamsConfig extends AbstractConfig {
                                         "/tmp/kafka-streams",
                                         Importance.MEDIUM,
                                         STATE_DIR_DOC)
+                                .define(REPLICATION_FACTOR_CONFIG,
+                                        Type.INT,
+                                        1,
+                                        Importance.MEDIUM,
+                                        REPLICATION_FACTOR_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG,        // required with no default value
                                         Type.CLASS,
                                         Importance.HIGH,
@@ -284,6 +293,7 @@ public class StreamsConfig extends AbstractConfig {
 
     private void removeStreamsSpecificConfigs(Map<String, Object> props) {
         props.remove(StreamsConfig.JOB_ID_CONFIG);
+        props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
         props.remove(StreamsConfig.STATE_DIR_CONFIG);
         props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
         props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
new file mode 100644
index 0000000..3768260
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalTopicManager {
+
+    private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
+
+    // TODO: the following ZK dependency should be removed after KIP-4
+    private static final String ZK_TOPIC_PATH = "/brokers/topics";
+    private static final String ZK_BROKER_PATH = "/brokers/ids";
+    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+    private final ZkClient zkClient;
+
+    private class ZKStringSerializer implements ZkSerializer {
+
+        @Override
+        public byte[] serialize(Object data) {
+            try {
+                return ((String) data).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        @Override
+        public Object deserialize(byte[] bytes) {
+            try {
+                if (bytes == null)
+                    return null;
+                else
+                    return new String(bytes, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+    }
+
+    public InternalTopicManager(String zkConnect) {
+        zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+    }
+
+    public void makeReady(String topic, int numPartitions, int replicationFactor) {
+        boolean topicNotReady = true;
+
+        while (topicNotReady) {
+            Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+            if (topicMetadata == null) {
+                try {
+                    createTopic(topic, numPartitions, replicationFactor);
+                } catch (ZkNodeExistsException e) {
+                    // ignore and continue
+                }
+            } else {
+                if (topicMetadata.size() > numPartitions) {
+                    // else if topic exists with more #.partitions than needed, delete in order to re-create it
+                    try {
+                        deleteTopic(topic);
+                    } catch (ZkNodeExistsException e) {
+                        // ignore and continue
+                    }
+                } else if (topicMetadata.size() < numPartitions) {
+                    // else if topic exists with less #.partitions than needed, add partitions
+                    try {
+                        addPartitions(topic, numPartitions - topicMetadata.size(), replicationFactor, topicMetadata);
+                    } catch (ZkNoNodeException e) {
+                        // ignore and continue
+                    }
+                } else {
+                    topicNotReady = false;
+                }
+            }
+        }
+    }
+
+    private List<Integer> getBrokers() {
+        List<Integer> brokers = new ArrayList<>();
+        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+            brokers.add(Integer.parseInt(broker));
+        }
+        Collections.sort(brokers);
+
+        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+        return brokers;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+        if (data == null) return null;
+
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+
+            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+            });
+
+            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+            return partitions;
+        } catch (IOException e) {
+            throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
+        }
+    }
+
+    public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
+        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+        List<Integer> brokers = getBrokers();
+        int numBrokers = brokers.size();
+        if (numBrokers < replicationFactor) {
+            log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " +  numBrokers);
+            replicationFactor = numBrokers;
+        }
+
+        Map<Integer, List<Integer>> assignment = new HashMap<>();
+
+        for (int i = 0; i < numPartitions; i++) {
+            ArrayList<Integer> brokerList = new ArrayList<>();
+            for (int r = 0; r < replicationFactor; r++) {
+                int shift = r * numBrokers / replicationFactor;
+                brokerList.add(brokers.get((i + shift) % numBrokers));
+            }
+            assignment.put(i, brokerList);
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", assignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        } catch (JsonProcessingException e) {
+            throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
+        }
+    }
+
+    public void deleteTopic(String topic) throws ZkNodeExistsException {
+        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+    }
+
+    public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
+        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+        List<Integer> brokers = getBrokers();
+        int numBrokers = brokers.size();
+        if (numBrokers < replicationFactor) {
+            log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " +  numBrokers);
+            replicationFactor = numBrokers;
+        }
+
+        int startIndex = existingAssignment.size();
+
+        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+        for (int i = 0; i < numPartitions; i++) {
+            ArrayList<Integer> brokerList = new ArrayList<>();
+            for (int r = 0; r < replicationFactor; r++) {
+                int shift = r * numBrokers / replicationFactor;
+                brokerList.add(brokers.get((i + shift) % numBrokers));
+            }
+            newAssignment.put(i + startIndex, brokerList);
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", newAssignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+        } catch (JsonProcessingException e) {
+            throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/878b78ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e600cf7..f49601c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
@@ -35,19 +34,6 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -100,132 +86,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
-    // TODO: the following ZK dependency should be removed after KIP-4
-    private static final String ZK_TOPIC_PATH = "/brokers/topics";
-    private static final String ZK_BROKER_PATH = "/brokers/ids";
-    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
-    private ZkClient zkClient;
-
-    private class ZKStringSerializer implements ZkSerializer {
-
-        @Override
-        public byte[] serialize(Object data) {
-            try {
-                return ((String) data).getBytes("UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-
-        @Override
-        public Object deserialize(byte[] bytes) {
-            try {
-                if (bytes == null)
-                    return null;
-                else
-                    return new String(bytes, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-    }
-
-    private List<Integer> getBrokers() {
-        List<Integer> brokers = new ArrayList<>();
-        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
-            brokers.add(Integer.parseInt(broker));
-        }
-        Collections.sort(brokers);
-
-        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
-        return brokers;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
-        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
-        if (data == null) return null;
-
-        try {
-            ObjectMapper mapper = new ObjectMapper();
-
-            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
-            });
-
-            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
-            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
-            return partitions;
-        } catch (IOException e) {
-            throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
-        }
-    }
-
-    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
-        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
-        // we always assign leaders to brokers starting at the first one with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        Map<Integer, List<Integer>> assignment = new HashMap<>();
-        for (int i = 0; i < numPartitions; i++) {
-            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", assignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        } catch (JsonProcessingException e) {
-            throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
-        }
-    }
-
-    private void deleteTopic(String topic) throws ZkNodeExistsException {
-        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
-        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
-    }
-
-    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
-        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
-        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
-        List<Integer> brokers = getBrokers();
-
-        int startIndex = existingAssignment.size();
-
-        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
-        for (int i = 0; i < numPartitions; i++) {
-            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", newAssignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
-        } catch (JsonProcessingException e) {
-            throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
-        }
-    }
+    private InternalTopicManager internalTopicManager;
 
     /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
@@ -255,7 +116,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         this.topicGroups = streamThread.builder.topicGroups();
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
-            zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+            internalTopicManager = new InternalTopicManager((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG));
     }
 
     @Override
@@ -411,7 +272,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
-        if (zkClient != null) {
+        if (internalTopicManager != null) {
             log.debug("Starting to validate changelog topics in partition assignor.");
 
             Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
@@ -428,38 +289,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                         numPartitions = task.partition + 1;
                 }
 
-                boolean topicNotReady = true;
-
-                while (topicNotReady) {
-                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
-                    // if topic does not exist, create it
-                    if (topicMetadata == null) {
-                        try {
-                            createTopic(topic, numPartitions);
-                        } catch (ZkNodeExistsException e) {
-                            // ignore and continue
-                        }
-                    } else {
-                        if (topicMetadata.size() > numPartitions) {
-                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
-                            try {
-                                deleteTopic(topic);
-                            } catch (ZkNodeExistsException e) {
-                                // ignore and continue
-                            }
-                        } else if (topicMetadata.size() < numPartitions) {
-                            // else if topic exists with less #.partitions than needed, add partitions
-                            try {
-                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
-                            } catch (ZkNoNodeException e) {
-                                // ignore and continue
-                            }
-                        }
-
-                        topicNotReady = false;
-                    }
-                }
+                internalTopicManager.makeReady(topic, numPartitions, 1);
 
                 // wait until the topic metadata has been propagated to all brokers
                 List<PartitionInfo> partitions;