You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/04 21:54:03 UTC

[1/2] samza git commit: SAMZA-1989: SystemStreamGrouper interface change for SEP-5

Repository: samza
Updated Branches:
  refs/heads/master f4ba9cdc1 -> 8a22e05c9


http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
index 2bf6cee..385083b 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -21,88 +21,265 @@ package org.apache.samza.container.grouper.stream;
 
 import static org.junit.Assert.*;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestGroupBySystemStreamPartition {
-  SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
-  SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
-  SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
-  SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
-  GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory();
+  private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+  private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+  private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+  private SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+  private GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory();
 
   @Test
   public void testLocalStreamGroupedCorrectly() {
-    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    Config config = new MapConfig(configMap);
-
-    SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);
-    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
+    SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(new MapConfig());
+    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(new HashSet<>());
     assertTrue(emptyResult.isEmpty());
 
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+                .put(new TaskName(aa0.toString()), ImmutableSet.of(aa0))
+                .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1))
+                .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2))
+                .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0))
+                .build();
 
-    HashSet<SystemStreamPartition> partitionaa0 = new HashSet<SystemStreamPartition>();
-    partitionaa0.add(aa0);
-    expectedResult.put(new TaskName(aa0.toString()), partitionaa0);
+    assertEquals(expectedResult, result);
+  }
 
-    HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
-    partitionaa1.add(aa1);
-    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+  @Test
+  public void testBroadcastStreamGroupedCorrectly() {
+    Config config = new MapConfig(ImmutableMap.of("task.broadcast.inputs", "SystemA.StreamA#0"));
 
-    HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
-    partitionaa2.add(aa2);
-    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+    SystemStreamPartitionGrouper grouper = new GroupBySystemStreamPartition(config);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
 
-    HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
-    partitionac0.add(ac0);
-    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName(aa1.toString()), ImmutableSet.of(aa1, aa0))
+            .put(new TaskName(aa2.toString()), ImmutableSet.of(aa2, aa0))
+            .put(new TaskName(ac0.toString()), ImmutableSet.of(ac0, aa0))
+            .build();
 
     assertEquals(expectedResult, result);
   }
 
   @Test
-  public void testBroadcastStreamGroupedCorrectly() {
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0");
-    Config config = new MapConfig(configMap);
+  public void testSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithSingleStream = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
-    GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory();
-    SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+            .collect(Collectors.toSet());
 
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .build();
 
-    HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
-    partitionaa1.add(aa1);
-    partitionaa1.add(aa0);
-    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithSingleStream, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
 
-    HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
-    partitionaa2.add(aa2);
-    partitionaa2.add(aa0);
-    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+  @Test
+  public void testMultipleStreamsWithSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
-    partitionac0.add(ac0);
-    partitionac0.add(aa0);
-    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+    IntStream.range(0, 4).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
 
-    assertEquals(expectedResult, result);
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testOnlyNewlyAddedStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+
+  @Test
+  public void testRemovalAndAdditionOfStreamsWithRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(5)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(6)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(4)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testMultipleStreamRepartitioningWithNewStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+                                                   .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+                                                   .collect(Collectors.toSet());
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+    IntStream.range(0, 8).forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 5]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 4]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 7]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, BOB, 6]"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, PVE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                                                        new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 1]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(1)),
+                                                                                        new SystemStreamPartition("kafka", "URE", new Partition(5))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 2]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(2)),
+                                                                                        new SystemStreamPartition("kafka", "URE", new Partition(6))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 3]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(3)),
+                                                                                        new SystemStreamPartition("kafka", "URE", new Partition(7))))
+            .put(new TaskName("SystemStreamPartition [kafka, URE, 0]"), ImmutableSet.of(new SystemStreamPartition("kafka", "URE", new Partition(0)),
+                                                                                        new SystemStreamPartition("kafka", "URE", new Partition(4))))
+            .build();
+
+    SSPGrouperProxy groupBySystemStreamPartition = new SSPGrouperProxy(new MapConfig(), new GroupBySystemStreamPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupBySystemStreamPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
   }
 }


[2/2] samza git commit: SAMZA-1989: SystemStreamGrouper interface change for SEP-5

Posted by ja...@apache.org.
SAMZA-1989: SystemStreamGrouper interface change for SEP-5

Samza users may need to increase the partition count of the input streams of their stateful samza jobs. For example, Kafka needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled.

In order to perform a join between streams, stateful jobs generally have to route the partitions from the different input streams to same task of a container. However, when a input stream repartitioning happens, key space of a partition gets redistributed. This will make the stateful jobs to produce erroneous results.

So if the partition count of input stream is increased then the users have to manually purge the changelog topics, local RocksDb state of their stateful jobs. This  results in an increased operational complexity and data loss.

This patch takes a first stab at solving the above problem and is comprised of the following changes:

* Introduce a new group method in `SystemStreamPartitionGrouper` interface to generate task assignment factoring in the partition expansion of input streams.
* Introduced a `StreamPartitionMapper` abstraction to allow the user to plugin the input stream partitioning function.
* Fixed the existing unit tests and added new unit tests to validate the new grouper changes.

In a followup PR shortly, these grouper changes would be integrated with `JobModelManager`(Waiting for PR 790 to be landed for this. It had made significant changes to `JobModelManager`)

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Prateek M<pm...@linkedin.com>, Ray Matharu<rm...@linkedin.com>, Daniel Nishimura<dn...@linkedin.com>

Closes #803 from shanthoosh/SEP-5


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8a22e05c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8a22e05c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8a22e05c

Branch: refs/heads/master
Commit: 8a22e05c9e5096ccb37ef31bc2bed01127c75ca4
Parents: f4ba9cd
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Tue Dec 4 13:53:56 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Dec 4 13:53:56 2018 -0800

----------------------------------------------------------------------
 .../grouper/stream/GrouperContext.java          |  69 +++++
 .../stream/SystemStreamPartitionGrouper.java    |  13 +-
 .../stream/SystemStreamPartitionMapper.java     |  40 +++
 .../SystemStreamPartitionMapperFactory.java     |  26 ++
 .../grouper/stream/GroupByPartition.java        |  41 +--
 .../stream/GroupBySystemStreamPartition.java    |  23 +-
 .../stream/HashSystemStreamPartitionMapper.java |  42 +++
 .../HashSystemStreamPartitionMapperFactory.java |  33 ++
 .../grouper/stream/SSPGrouperProxy.java         | 190 ++++++++++++
 .../org/apache/samza/config/JobConfig.scala     |   9 +-
 .../grouper/stream/TestGroupByPartition.java    | 301 ++++++++++++++-----
 .../TestGroupBySystemStreamPartition.java       | 285 ++++++++++++++----
 12 files changed, 903 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-api/src/main/java/org/apache/samza/container/grouper/stream/GrouperContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/GrouperContext.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/GrouperContext.java
new file mode 100644
index 0000000..d1118e4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/GrouperContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A Wrapper class that holds the necessary historical metadata of the samza job which is used
+ * by the {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
+ * to generate optimal task assignments.
+ */
+@InterfaceStability.Evolving
+public class GrouperContext {
+  private Map<String, LocationId> processorLocality;
+  private Map<TaskName, LocationId> taskLocality;
+  private Map<TaskName, Set<SystemStreamPartition>> previousTaskToSSPAssignment;
+  private Map<TaskName, String> previousTaskToContainerAssignment;
+
+  public GrouperContext(Map<String, LocationId> processorLocality, Map<TaskName, LocationId> taskLocality, Map<TaskName, Set<SystemStreamPartition>> previousTaskToSSPAssignments, Map<TaskName, String> previousTaskToContainerAssignment) {
+    this.processorLocality = processorLocality;
+    this.taskLocality = taskLocality;
+    this.previousTaskToSSPAssignment = previousTaskToSSPAssignments;
+    this.previousTaskToContainerAssignment = previousTaskToContainerAssignment;
+  }
+
+  public Map<String, LocationId> getProcessorLocality() {
+    return Collections.unmodifiableMap(processorLocality);
+  }
+
+  public Map<TaskName, LocationId> getTaskLocality() {
+    return Collections.unmodifiableMap(taskLocality);
+  }
+
+  public Map<TaskName, Set<SystemStreamPartition>> getPreviousTaskToSSPAssignment() {
+    return Collections.unmodifiableMap(previousTaskToSSPAssignment);
+  }
+
+  public List<String> getProcessorIds() {
+    return new ArrayList<>(processorLocality.keySet());
+  }
+
+  public Map<TaskName, String> getPreviousTaskToContainerAssignment() {
+    return Collections.unmodifiableMap(this.previousTaskToContainerAssignment);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
index f374c4b..6076076 100644
--- a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
@@ -18,12 +18,11 @@
  */
 package org.apache.samza.container.grouper.stream;
 
+import java.util.*;
+
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
  * by the implementation.  Each taskName has a key that uniquely describes what sets may be in it, but does
@@ -37,5 +36,11 @@ import java.util.Set;
  * the TaskNames.
  */
 public interface SystemStreamPartitionGrouper {
-  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
+
+  /**
+   * Groups the input systemStreamPartitions into the logical taskNames.
+   * @param systemStreamPartitions the input system stream partitions.
+   * @return the grouped {@link TaskName} to {@link SystemStreamPartition} assignments.
+   */
+  Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> systemStreamPartitions);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapper.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapper.java
new file mode 100644
index 0000000..0f1f307
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * Input streams of a samza job can be either expanded or contracted by the user.
+ * This abstraction determines the previous {@link SystemStreamPartition} for a {@link SystemStreamPartition}
+ * of a input stream after the stream expansion or contraction.
+ */
+public interface SystemStreamPartitionMapper {
+
+  /**
+   * Determines the previous {@link SystemStreamPartition} for a {@link SystemStreamPartition}
+   * of a input stream after the stream expansion or contraction.
+   * @param currentSystemStreamPartition denotes the current partition after the stream partition count change.
+   * @param previousPartitionCount the partition count of the stream before the stream partition count change.
+   * @param currentPartitionCount the partition count of the stream after the stream partition count change.
+   * @return the mapped {@link SystemStreamPartition}.
+   */
+  SystemStreamPartition getPreviousSSP(SystemStreamPartition currentSystemStreamPartition,
+                                       int previousPartitionCount, int currentPartitionCount);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapperFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapperFactory.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapperFactory.java
new file mode 100644
index 0000000..d7c2569
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionMapperFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public interface SystemStreamPartitionMapperFactory {
+  SystemStreamPartitionMapper getStreamPartitionMapper(Config config, MetricsRegistry metricsRegistry);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 8e3ef31..9542791 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -23,47 +23,39 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
+/**
+ * An implementation of {@link SystemStreamPartitionGrouper} that groups the input stream partitions according to their partition number.
+ * This leads to a single task processing all the messages of a single partition (e.g. partition 0) across all the input streams.
+ * Using this strategy, if two input streams have a partition 0, then all the messages from both partitions will be routed to a single task.
+ * This partitioning strategy is useful for joining and aggregating input streams.
+ */
 public class GroupByPartition implements SystemStreamPartitionGrouper {
-  private TaskConfigJava taskConfig = null;
-  private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
-
-  /**
-   * default constructor
-   */
-  public GroupByPartition() {
-  }
+  private final Set<SystemStreamPartition> broadcastStreams;
 
-  /**
-   * Accepts the config in the constructor
-   *
-   * @param config job's config
-   */
   public GroupByPartition(Config config) {
-    if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
-      taskConfig = new TaskConfigJava(config);
-      this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
-    }
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
   }
 
   @Override
   public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
-    Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<>();
 
     for (SystemStreamPartition ssp : ssps) {
-      // notAValidEvent the broadcast streams if there is any
+      // Broadcast streams are part of all the tasks. They are added to each task at the end.
       if (broadcastStreams.contains(ssp)) {
         continue;
       }
 
-      TaskName taskName = new TaskName("Partition " + ssp.getPartition().getPartitionId());
+      TaskName taskName = new TaskName(String.format("Partition %d", ssp.getPartition().getPartitionId()));
+
       if (!groupedMap.containsKey(taskName)) {
-        groupedMap.put(taskName, new HashSet<SystemStreamPartition>());
+        groupedMap.put(taskName, new HashSet<>());
       }
       groupedMap.get(taskName).add(ssp);
     }
@@ -71,13 +63,10 @@ public class GroupByPartition implements SystemStreamPartitionGrouper {
     // assign the broadcast streams to all the taskNames
     if (!broadcastStreams.isEmpty()) {
       for (Set<SystemStreamPartition> value : groupedMap.values()) {
-        for (SystemStreamPartition ssp : broadcastStreams) {
-          value.add(ssp);
-        }
+        value.addAll(broadcastStreams);
       }
     }
 
     return groupedMap;
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index af96523..90828fd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -29,20 +29,16 @@ import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
+/**
+ * An implementation of {@link SystemStreamPartitionGrouper} that assigns each input {@link SystemStreamPartition} to a separate task.
+ * Provides increased parallelism in message processing within a container. Useful in message processing scenarios involving remote I/O.
+ */
 public class GroupBySystemStreamPartition implements SystemStreamPartitionGrouper {
-  private TaskConfigJava taskConfig = null;
-  private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
+  private final Set<SystemStreamPartition> broadcastStreams;
 
-  /**
-   * A constructor that accepts job config as the parameter
-   *
-   * @param config job config
-   */
   public GroupBySystemStreamPartition(Config config) {
-    if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
-      taskConfig = new TaskConfigJava(config);
-      broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
-    }
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
   }
 
   @Override
@@ -62,13 +58,10 @@ public class GroupBySystemStreamPartition implements SystemStreamPartitionGroupe
     // assign the broadcast streams to all the taskNames
     if (!broadcastStreams.isEmpty()) {
       for (Set<SystemStreamPartition> value : groupedMap.values()) {
-        for (SystemStreamPartition ssp : broadcastStreams) {
-          value.add(ssp);
-        }
+        value.addAll(broadcastStreams);
       }
     }
 
     return groupedMap;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapper.java
new file mode 100644
index 0000000..7aeb3cf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A SystemStreamPartitionMapper that uses the hash partitioning function of the producer to map a {@link SystemStreamPartition} to
+ * correct previous {@link SystemStreamPartition} after the stream expansion.
+ */
+public class HashSystemStreamPartitionMapper implements SystemStreamPartitionMapper {
+
+  @Override
+  public SystemStreamPartition getPreviousSSP(SystemStreamPartition currentSystemStreamPartition, int previousPartitionCount, int currentPartitionCount) {
+    Preconditions.checkNotNull(currentSystemStreamPartition);
+    Preconditions.checkArgument(currentPartitionCount % previousPartitionCount == 0,
+        String.format("New partition count: %d should be a multiple of previous partition count: %d.", currentPartitionCount, previousPartitionCount));
+    Partition partition = currentSystemStreamPartition.getPartition();
+    Preconditions.checkNotNull(partition, String.format("SystemStreamPartition: %s cannot have null partition", currentSystemStreamPartition));
+    int currentPartitionId = partition.getPartitionId();
+    int previousPartitionId = currentPartitionId % previousPartitionCount;
+    return new SystemStreamPartition(currentSystemStreamPartition.getSystemStream(), new Partition(previousPartitionId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapperFactory.java
new file mode 100644
index 0000000..abd0f49
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/HashSystemStreamPartitionMapperFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Instantiates the {@link HashSystemStreamPartitionMapper} based upon the provided
+ * config and the metricsRegistry.
+ */
+public class HashSystemStreamPartitionMapperFactory implements SystemStreamPartitionMapperFactory {
+  @Override
+  public SystemStreamPartitionMapper getStreamPartitionMapper(Config config, MetricsRegistry metricsRegistry) {
+    return new HashSystemStreamPartitionMapper();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
new file mode 100644
index 0000000..0330700
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.google.common.base.Preconditions;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a stream expansion aware task to partition assignments on top of a custom implementation
+ * of the {@link SystemStreamPartitionGrouper}.
+ */
+public class SSPGrouperProxy {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SSPGrouperProxy.class);
+
+  private final SystemStreamPartitionMapper systemStreamPartitionMapper;
+  private final Set<SystemStreamPartition> broadcastSystemStreamPartitions;
+  private final SystemStreamPartitionGrouper grouper;
+
+  public SSPGrouperProxy(Config config, SystemStreamPartitionGrouper grouper) {
+    Preconditions.checkNotNull(config);
+    Preconditions.checkNotNull(grouper);
+    this.grouper = grouper;
+    this.broadcastSystemStreamPartitions = new TaskConfigJava(config).getBroadcastSystemStreamPartitions();
+    this.systemStreamPartitionMapper = getSystemStreamPartitionMapper(config);
+  }
+
+  /**
+   * 1. Invokes the sub-class {@link org.apache.samza.container.grouper.task.TaskNameGrouper#group(Set)} method to generate the task to partition assignments.
+   * 2. Uses the {@link SystemStreamPartitionMapper} to compute the previous {@link SystemStreamPartition} for every partition of the expanded or contracted streams.
+   * 3. Uses the previous, current task to partition assignments and result of {@link SystemStreamPartitionMapper} to redistribute the expanded {@link SystemStreamPartition}'s
+   * to correct tasks after the stream expansion or contraction.
+   * @param ssps the input system stream partitions of the job.
+   * @param grouperContext the grouper context holding metadata for the previous job execution.
+   * @return the grouped {@link TaskName} to {@link SystemStreamPartition} assignments.
+   */
+  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps, GrouperContext grouperContext) {
+    Map<TaskName, Set<SystemStreamPartition>> currentTaskAssignments = grouper.group(ssps);
+
+    if (grouperContext.getPreviousTaskToSSPAssignment().isEmpty()) {
+      LOGGER.info("Previous task to partition assignment does not exist. Using the result from the group method.");
+      return currentTaskAssignments;
+    }
+
+    Map<SystemStreamPartition, TaskName> previousSSPToTask = getPreviousSSPToTaskMapping(grouperContext);
+
+    Map<TaskName, PartitionGroup> taskToPartitionGroup = new HashMap<>();
+    currentTaskAssignments.forEach((taskName, systemStreamPartitions) -> taskToPartitionGroup.put(taskName, new PartitionGroup(taskName, systemStreamPartitions)));
+
+    Map<SystemStream, Integer> previousStreamToPartitionCount = getSystemStreamToPartitionCount(grouperContext.getPreviousTaskToSSPAssignment());
+    Map<SystemStream, Integer> currentStreamToPartitionCount = getSystemStreamToPartitionCount(currentTaskAssignments);
+
+    for (Map.Entry<TaskName, Set<SystemStreamPartition>> entry : currentTaskAssignments.entrySet()) {
+      TaskName currentlyAssignedTask = entry.getKey();
+      for (SystemStreamPartition currentSystemStreamPartition : entry.getValue()) {
+        if (broadcastSystemStreamPartitions.contains(currentSystemStreamPartition)) {
+          LOGGER.info("SystemStreamPartition: {} is part of broadcast stream. Skipping reassignment.", currentSystemStreamPartition);
+        } else {
+          SystemStream systemStream = currentSystemStreamPartition.getSystemStream();
+
+          Integer previousStreamPartitionCount = previousStreamToPartitionCount.getOrDefault(systemStream, 0);
+          Integer currentStreamPartitionCount = currentStreamToPartitionCount.getOrDefault(systemStream, 0);
+
+          if (previousStreamPartitionCount > 0 && !currentStreamPartitionCount.equals(previousStreamPartitionCount)) {
+            LOGGER.info("Partition count of system stream: {} had changed from: {} to: {} partitions. Performing partition reassignment.", systemStream, previousStreamPartitionCount, currentStreamPartitionCount);
+
+            SystemStreamPartition previousSystemStreamPartition = systemStreamPartitionMapper.getPreviousSSP(currentSystemStreamPartition, previousStreamPartitionCount, currentStreamPartitionCount);
+            TaskName previouslyAssignedTask = previousSSPToTask.get(previousSystemStreamPartition);
+
+            LOGGER.info("Moving systemStreamPartition: {} from task: {} to task: {}.", currentSystemStreamPartition, currentlyAssignedTask, previouslyAssignedTask);
+
+            taskToPartitionGroup.get(currentlyAssignedTask).removeSSP(currentSystemStreamPartition);
+            taskToPartitionGroup.get(previouslyAssignedTask).addSSP(currentSystemStreamPartition);
+          } else {
+            LOGGER.debug("No partition change in SystemStream: {}. Skipping partition reassignment.", systemStream);
+          }
+        }
+      }
+    }
+
+    Map<TaskName, Set<SystemStreamPartition>> taskToSystemStreamPartitions = new HashMap<>();
+    for (PartitionGroup partitionGroup : taskToPartitionGroup.values()) {
+      if (!partitionGroup.systemStreamPartitions.isEmpty()) {
+        taskToSystemStreamPartitions.put(partitionGroup.taskName, partitionGroup.systemStreamPartitions);
+      }
+    }
+    return taskToSystemStreamPartitions;
+  }
+
+  /**
+   * Computes a mapping from system stream to partition count using the provided {@param taskToSSPAssignment}.
+   * @param taskToSSPAssignment the {@link TaskName} to {@link SystemStreamPartition}'s assignment of the job.
+   * @return a mapping from {@link SystemStream} to the number of partitions of the stream.
+   */
+  private Map<SystemStream, Integer> getSystemStreamToPartitionCount(Map<TaskName, Set<SystemStreamPartition>> taskToSSPAssignment) {
+    Map<SystemStream, Integer> systemStreamToPartitionCount = new HashMap<>();
+    taskToSSPAssignment.forEach((taskName, systemStreamPartitions) -> {
+        systemStreamPartitions.forEach(systemStreamPartition -> {
+            SystemStream systemStream = systemStreamPartition.getSystemStream();
+            systemStreamToPartitionCount.put(systemStream, systemStreamToPartitionCount.getOrDefault(systemStream, 0) + 1);
+          });
+      });
+
+    return systemStreamToPartitionCount;
+  }
+
+  /**
+   * Computes a mapping from the {@link SystemStreamPartition} to {@link TaskName} using the provided {@param grouperContext}
+   * @param grouperContext the grouper context that contains relevant historical metadata about the job.
+   * @return a mapping from {@link SystemStreamPartition} to {@link TaskName}.
+   */
+  private Map<SystemStreamPartition, TaskName> getPreviousSSPToTaskMapping(GrouperContext grouperContext) {
+    Map<SystemStreamPartition, TaskName> sspToTaskMapping = new HashMap<>();
+    Map<TaskName, Set<SystemStreamPartition>> previousTaskToSSPAssignment = grouperContext.getPreviousTaskToSSPAssignment();
+    previousTaskToSSPAssignment.forEach((taskName, systemStreamPartitions) -> {
+        systemStreamPartitions.forEach(systemStreamPartition -> {
+            if (!broadcastSystemStreamPartitions.contains(systemStreamPartition)) {
+              sspToTaskMapping.put(systemStreamPartition, taskName);
+            }
+          });
+      });
+    return sspToTaskMapping;
+  }
+
+  /**
+   * Creates a instance of {@link SystemStreamPartitionMapper} using the stream partition expansion factory class
+   * defined in the {@param config}.
+   * @param config the configuration of the samza job.
+   * @return the instantiated {@link SystemStreamPartitionMapper} object.
+   */
+  private SystemStreamPartitionMapper getSystemStreamPartitionMapper(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    String systemStreamPartitionMapperClass = jobConfig.getSystemStreamPartitionMapperFactoryName();
+    SystemStreamPartitionMapperFactory systemStreamPartitionMapperFactory = Util.getObj(systemStreamPartitionMapperClass, SystemStreamPartitionMapperFactory.class);
+    return systemStreamPartitionMapperFactory.getStreamPartitionMapper(config, new MetricsRegistryMap());
+  }
+
+  /**
+   * A mutable group of {@link SystemStreamPartition} and {@link TaskName}.
+   * Used to hold the interim results until the final task assignments are known.
+   */
+  private static class PartitionGroup {
+    private TaskName taskName;
+    private Set<SystemStreamPartition> systemStreamPartitions;
+
+    PartitionGroup(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions) {
+      Preconditions.checkNotNull(taskName);
+      Preconditions.checkNotNull(systemStreamPartitions);
+      this.taskName = taskName;
+      this.systemStreamPartitions = new HashSet<>(systemStreamPartitions);
+    }
+
+    void removeSSP(SystemStreamPartition systemStreamPartition) {
+      systemStreamPartitions.remove(systemStreamPartition);
+    }
+
+    void addSSP(SystemStreamPartition systemStreamPartition) {
+      systemStreamPartitions.add(systemStreamPartition);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 5363e72..1b55fad 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,11 +19,10 @@
 
 package org.apache.samza.config
 
-
 import java.io.File
 import java.util.regex.Pattern
 
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.container.grouper.stream.{GroupByPartitionFactory, HashSystemStreamPartitionMapperFactory}
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
@@ -114,6 +113,8 @@ object JobConfig {
   val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
   val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
 
+  val SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = "job.system.stream.partition.mapper.factory"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -260,4 +261,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getJMXEnabled = {
     getBoolean(JobConfig.JOB_JMX_ENABLED, true);
   }
+
+  def getSystemStreamPartitionMapperFactoryName: String = {
+    get(JobConfig.SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, classOf[HashSystemStreamPartitionMapperFactory].getName)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8a22e05c/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
index 2a8d447..b0e0ccd 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
@@ -19,112 +19,277 @@
 
 package org.apache.samza.container.grouper.stream;
 
-import static org.junit.Assert.*;
-
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestGroupByPartition {
-  SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
-  SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
-  SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
-  SystemStreamPartition ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1));
-  SystemStreamPartition ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2));
-  SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+  private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+  private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+  private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+  private SystemStreamPartition ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1));
+  private SystemStreamPartition ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2));
+  private SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
 
   @Test
   public void testLocalStreamsGroupedCorrectly() {
-    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
-    GroupByPartition grouper = new GroupByPartition();
-    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
-    // empty SSP set gets empty groups
+    GroupByPartition grouper = new GroupByPartition(new MapConfig());
+    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(new HashSet<>());
     assertTrue(emptyResult.isEmpty());
 
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ab1, ab2, ac0));
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(aa0, ac0))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(aa1, ab1))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(aa2, ab2))
+            .build();
 
-    HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
-    partition0.add(aa0);
-    partition0.add(ac0);
-    expectedResult.put(new TaskName("Partition 0"), partition0);
+    assertEquals(expectedResult, result);
+  }
 
-    HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
-    partition1.add(aa1);
-    partition1.add(ab1);
-    expectedResult.put(new TaskName("Partition 1"), partition1);
+  @Test
+  public void testBroadcastStreamsGroupedCorrectly() {
+    Config config = new MapConfig(ImmutableMap.of("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1"));
+    GroupByPartition grouper = new GroupByPartition(config);
 
-    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
-    partition2.add(aa2);
-    partition2.add(ab2);
-    expectedResult.put(new TaskName("Partition 2"), partition2);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ab1, ab2, ac0));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(aa0, ac0, ab1))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(aa1, aa0, ab1))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(aa2, aa0, ab2, ab1))
+            .build();
 
     assertEquals(expectedResult, result);
   }
 
   @Test
-  public void testBroadcastStreamsGroupedCorrectly() {
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
-    Config config = new MapConfig(configMap);
+  public void testNoTaskOnlyContainsBroadcastStreams() {
+    Config config = new MapConfig(ImmutableMap.of("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1"));
     GroupByPartition grouper = new GroupByPartition(config);
 
-    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
-    Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, ab1, ab2));
 
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 2"), ImmutableSet.of(aa0, ab1, ab2)).build();
 
-    HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
-    partition0.add(aa0); // broadcast stream
-    partition0.add(ac0);
-    partition0.add(ab1); // broadcast stream
-    expectedResult.put(new TaskName("Partition 0"), partition0);
+    assertEquals(expectedResult, result);
+  }
 
-    HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
-    partition1.add(aa1);
-    partition1.add(ab1); // broadcast stream
-    partition1.add(aa0); // broadcast stream
-    expectedResult.put(new TaskName("Partition 1"), partition1);
+  @Test
+  public void testSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithSingleStream = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
-    partition2.add(aa2);
-    partition2.add(ab2);
-    partition2.add(aa0); // broadcast stream
-    partition2.add(ab1); // broadcast stream
-    expectedResult.put(new TaskName("Partition 2"), partition2);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+            .collect(Collectors.toSet());
 
-    assertEquals(expectedResult, result);
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(5))))
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(4))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(3))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(6))))
+            .build();
+
+    SSPGrouperProxy groupByPartition = new SSPGrouperProxy(new MapConfig(), new GroupByPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithSingleStream, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupByPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
   }
 
   @Test
-  public void testNoTaskOnlyContainsBroadcastStreams() {
-    HashMap<String, String> configMap = new HashMap<String, String>();
-    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
-    Config config = new MapConfig(configMap);
-    GroupByPartition grouper = new GroupByPartition(config);
+  public void testMultipleStreamsWithSingleStreamRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
 
-    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
-    Collections.addAll(allSSPs, aa0, ab1, ab2);
-    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "PVE", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+    IntStream.range(0, 4)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
+    IntStream.range(0, 8)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
 
-    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
-    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
-    partition2.add(aa0); // broadcast stream
-    partition2.add(ab1);
-    partition2.add(ab2); // broadcast stream
-    expectedResult.put(new TaskName("Partition 2"), partition2);
 
-    assertEquals(expectedResult, result);
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(5)),
+                                                              new SystemStreamPartition("kafka", "URE", new Partition(1)),
+                                                              new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(4)),
+                                                              new SystemStreamPartition("kafka", "URE", new Partition(0)),
+                                                              new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                                                              new SystemStreamPartition("kafka", "URE", new Partition(3)),
+                                                              new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                                                              new SystemStreamPartition("kafka", "PVE", new Partition(6)),
+                                                              new SystemStreamPartition("kafka", "URE", new Partition(2)),
+                                                              new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("Partition 5"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("Partition 4"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("Partition 7"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("Partition 6"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .build();
+
+    SSPGrouperProxy groupByPartition = new SSPGrouperProxy(new MapConfig(), new GroupByPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupByPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testOnlyNewlyAddedStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+
+    // expected Grouping
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 5"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("Partition 4"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("Partition 7"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("Partition 6"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .build();
+
+    SSPGrouperProxy groupByPartition = new SSPGrouperProxy(new MapConfig(), new GroupByPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupByPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+
+  @Test
+  public void testRemovalAndAdditionOfStreamsWithRepartitioning() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = IntStream.range(0, 8)
+            .mapToObj(partitionId -> new SystemStreamPartition("kafka", "BOB", new Partition(partitionId)))
+            .collect(Collectors.toSet());
+
+    IntStream.range(0, 8)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))));
+
+    // expected grouping
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(5)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(4)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(6)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("Partition 5"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("Partition 4"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("Partition 7"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("Partition 6"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .build();
+
+    SSPGrouperProxy groupByPartition = new SSPGrouperProxy(new MapConfig(), new GroupByPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupByPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
+  }
+
+  @Test
+  public void testMultipleStreamRepartitioningWithNewStreams() {
+    Map<TaskName, Set<SystemStreamPartition>> prevGroupingWithMultipleStreams = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)), new SystemStreamPartition("kafka", "URE", new Partition(0))))
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)), new SystemStreamPartition("kafka", "URE", new Partition(1))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)), new SystemStreamPartition("kafka", "URE", new Partition(2))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(3)), new SystemStreamPartition("kafka", "URE", new Partition(3))))
+            .build();
+
+    Set<SystemStreamPartition> currSsps = new HashSet<>();
+    IntStream.range(0, 8)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "PVE", new Partition(partitionId))));
+    IntStream.range(0, 8)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "URE", new Partition(partitionId))));
+    IntStream.range(0, 8)
+             .forEach(partitionId -> currSsps.add(new SystemStreamPartition("kafka", "BOB", new Partition(partitionId))));
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedGrouping = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
+            .put(new TaskName("Partition 1"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(1)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(5)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(1)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(5)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(1))))
+            .put(new TaskName("Partition 0"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(0)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(4)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(0)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(4)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(0))))
+            .put(new TaskName("Partition 3"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(3)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(3)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(7)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(3))))
+            .put(new TaskName("Partition 2"), ImmutableSet.of(new SystemStreamPartition("kafka", "PVE", new Partition(2)),
+                    new SystemStreamPartition("kafka", "PVE", new Partition(6)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(2)),
+                    new SystemStreamPartition("kafka", "URE", new Partition(6)),
+                    new SystemStreamPartition("kafka", "BOB", new Partition(2))))
+            .put(new TaskName("Partition 5"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(5))))
+            .put(new TaskName("Partition 4"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(4))))
+            .put(new TaskName("Partition 7"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(7))))
+            .put(new TaskName("Partition 6"), ImmutableSet.of(new SystemStreamPartition("kafka", "BOB", new Partition(6))))
+            .build();
+
+
+    SSPGrouperProxy groupByPartition = new SSPGrouperProxy(new MapConfig(), new GroupByPartition(new MapConfig()));
+    GrouperContext grouperContext = new GrouperContext(new HashMap<>(), new HashMap<>(), prevGroupingWithMultipleStreams, new HashMap<>());
+    Map<TaskName, Set<SystemStreamPartition>> finalGrouping = groupByPartition.group(currSsps, grouperContext);
+    Assert.assertEquals(expectedGrouping, finalGrouping);
   }
 }