You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/04 21:02:37 UTC
samza git commit: SAMZA-1439: Address late review feedback from
SAMZA-1434
Repository: samza
Updated Branches:
refs/heads/master fd7a57708 -> 8b3fe5d26
SAMZA-1439: Address late review feedback from SAMZA-1434
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #313 from jmakes/samza-1439
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b3fe5d2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b3fe5d2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b3fe5d2
Branch: refs/heads/master
Commit: 8b3fe5d2666a97ee5b3571a3788e8fefd23f55fd
Parents: fd7a577
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Oct 4 14:02:26 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Oct 4 14:02:26 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/TestStreamSpec.java | 55 ++++++++++++++++++++
.../samza/execution/ExecutionPlanner.java | 13 +++--
.../org/apache/samza/execution/StreamEdge.java | 3 +-
.../samza/execution/TestExecutionPlanner.java | 42 +++++++++++----
.../system/hdfs/TestHdfsSystemConsumer.java | 10 ++--
.../samza/system/kafka/KafkaStreamSpec.java | 4 ++
.../samza/system/kafka/TestKafkaStreamSpec.java | 5 ++
7 files changed, 112 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java b/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java
new file mode 100644
index 0000000..8974877
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestStreamSpec {
+ @Test
+ public void testBasicConstructor() {
+ StreamSpec streamSpec = new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", 1);
+
+ assertEquals("dummyId", streamSpec.getId());
+ assertEquals("dummyPhysicalName", streamSpec.getPhysicalName());
+ assertEquals("dummySystemName", streamSpec.getSystemName());
+ assertEquals(1, streamSpec.getPartitionCount());
+
+ // SystemStream should use the physical name, not the streamId.
+ SystemStream systemStream = new SystemStream("dummySystemName", "dummyPhysicalName");
+ assertEquals(systemStream, streamSpec.toSystemStream());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidPartitionCount() {
+ new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidStreamId() {
+ new StreamSpec("dummy.Id", "dummyPhysicalName", "dummySystemName", 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidSystemName() {
+ new StreamSpec("dummyId", "dummyPhysicalName", "dummy.System.Name", 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 998ea1e..468aab9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
- private static final int MAX_INFERRED_PARTITIONS = 256;
+ static final int MAX_INFERRED_PARTITIONS = 256;
private final Config config;
private final StreamManager streamManager;
@@ -255,10 +255,17 @@ public class ExecutionPlanner {
if (partitions < 0) {
// use the following simple algo to figure out the partitions
// partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
- // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is important when running in hadoop.
+ // partition will be further bounded by MAX_INFERRED_PARTITIONS.
+ // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
int maxInPartitions = maxPartition(jobGraph.getSources());
int maxOutPartitions = maxPartition(jobGraph.getSinks());
- partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), MAX_INFERRED_PARTITIONS);
+ partitions = Math.max(maxInPartitions, maxOutPartitions);
+
+ if (partitions > MAX_INFERRED_PARTITIONS) {
+ partitions = MAX_INFERRED_PARTITIONS;
+ log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
+ partitions, MAX_INFERRED_PARTITIONS));
+ }
}
for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
if (edge.getPartitionCount() <= 0) {
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 792fde5..62d85f1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -82,8 +82,7 @@ public class StreamEdge {
}
SystemStream getSystemStream() {
- StreamSpec spec = getStreamSpec();
- return new SystemStream(spec.getSystemName(), spec.getPhysicalName());
+ return getStreamSpec().toSystemStream();
}
String getFormattedSystemStream() {
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index eb5ca7b..50b0a13 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -67,6 +67,7 @@ public class TestExecutionPlanner {
private StreamSpec input1;
private StreamSpec input2;
private StreamSpec input3;
+ private StreamSpec input4;
private StreamSpec output1;
private StreamSpec output2;
@@ -194,6 +195,7 @@ public class TestExecutionPlanner {
input1 = new StreamSpec("input1", "input1", "system1");
input2 = new StreamSpec("input2", "input2", "system2");
input3 = new StreamSpec("input3", "input3", "system2");
+ input4 = new StreamSpec("input4", "input4", "system1");
output1 = new StreamSpec("output1", "output1", "system1");
output2 = new StreamSpec("output2", "output2", "system2");
@@ -202,6 +204,7 @@ public class TestExecutionPlanner {
Map<String, Integer> system1Map = new HashMap<>();
system1Map.put("input1", 64);
system1Map.put("output1", 8);
+ system1Map.put("input4", ExecutionPlanner.MAX_INFERRED_PARTITIONS * 2);
Map<String, Integer> system2Map = new HashMap<>();
system2Map.put("input2", 16);
system2Map.put("input3", 32);
@@ -218,6 +221,7 @@ public class TestExecutionPlanner {
when(runner.getStreamSpec("input1")).thenReturn(input1);
when(runner.getStreamSpec("input2")).thenReturn(input2);
when(runner.getStreamSpec("input3")).thenReturn(input3);
+ when(runner.getStreamSpec("input4")).thenReturn(input4);
when(runner.getStreamSpec("output1")).thenReturn(output1);
when(runner.getStreamSpec("output2")).thenReturn(output2);
@@ -316,10 +320,10 @@ public class TestExecutionPlanner {
StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
- assertEquals(jobConfigs.size(), 1);
+ assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
- assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+ assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
@Test
@@ -333,10 +337,10 @@ public class TestExecutionPlanner {
StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
- assertEquals(jobConfigs.size(), 1);
+ assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
- assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+ assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
@@ -350,7 +354,7 @@ public class TestExecutionPlanner {
StreamGraphImpl streamGraph = createSimpleGraph();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
- assertEquals(jobConfigs.size(), 1);
+ assertEquals(1, jobConfigs.size());
assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
}
@@ -365,8 +369,8 @@ public class TestExecutionPlanner {
StreamGraphImpl streamGraph = createSimpleGraph();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
- assertEquals(jobConfigs.size(), 1);
- assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "2000");
+ assertEquals(1, jobConfigs.size());
+ assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
@Test
@@ -377,7 +381,7 @@ public class TestExecutionPlanner {
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
- assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
+ assertEquals(64, edge.getPartitionCount()); // max of input1 and output1
});
}
@@ -394,9 +398,27 @@ public class TestExecutionPlanner {
edge.setPartitionCount(16);
edges.add(edge);
- assertEquals(ExecutionPlanner.maxPartition(edges), 32);
+ assertEquals(32, ExecutionPlanner.maxPartition(edges));
edges = Collections.emptyList();
- assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN);
+ assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartition(edges));
+ }
+
+ @Test
+ public void testMaxPartitionLimit() throws Exception {
+ int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
+
+ ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+
+ MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4");
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1);
+ JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
+
+ // the partitions should be the same as input1
+ jobGraph.getIntermediateStreams().forEach(edge -> {
+ assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1
+ });
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
index 21afcb9..396b06b 100644
--- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
+++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
@@ -107,11 +107,11 @@ public class TestHdfsSystemConsumer {
// verify events read from consumer
int eventsReceived = 0;
int totalEvents = (NUM_EVENTS + 1) * NUM_FILES; // one "End of Stream" event in the end
- int remainingRetires = 100;
+ int remainingRetries = 100;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> overallResults = new HashMap<>();
- while (eventsReceived < totalEvents && remainingRetires > 0) {
- remainingRetires--;
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 200);
+ while (eventsReceived < totalEvents && remainingRetries > 0) {
+ remainingRetries--;
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 1000);
for(SystemStreamPartition ssp : result.keySet()) {
List<IncomingMessageEnvelope> messageEnvelopeList = result.get(ssp);
overallResults.putIfAbsent(ssp, new ArrayList<>());
@@ -122,7 +122,7 @@ public class TestHdfsSystemConsumer {
eventsReceived += messageEnvelopeList.size();
}
}
- Assert.assertEquals(eventsReceived, totalEvents);
+ Assert.assertEquals("Did not receive all the events. Retry counter = " + remainingRetries, totalEvents, eventsReceived);
Assert.assertEquals(NUM_FILES, overallResults.size());
overallResults.values().forEach(messages -> {
Assert.assertEquals(NUM_EVENTS + 1, messages.size());
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index fd53a45..a49c022 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -151,6 +151,10 @@ public class KafkaStreamSpec extends StreamSpec {
Properties properties) {
super(id, topicName, systemName, partitionCount, false, propertiesToMap(properties));
+ if (partitionCount < 1) {
+ throw new IllegalArgumentException("Parameter 'partitionCount' must be > 0");
+ }
+
if (replicationFactor <= 0) {
throw new IllegalArgumentException(
String.format("Replication factor %d must be greater than 0.", replicationFactor));
http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
index 5612704..1758bf0 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -57,4 +57,9 @@ public class TestKafkaStreamSpec {
assertNull(kafkaConfig.get("replication.factor"));
assertEquals("4", kafkaConfig.get("segment.bytes"));
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidPartitionCount() {
+ new KafkaStreamSpec("dummyId","dummyPhysicalName", "dummySystemName", 0);
+ }
}