You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:45 UTC

[07/16] samza git commit: Merge branch 'master' into 0.14.0

Merge branch 'master' into 0.14.0


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1701ea84
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1701ea84
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1701ea84

Branch: refs/heads/master
Commit: 1701ea84a2e029b3297687b5fc814998371b1a6f
Parents: 79200c7 fb39a51
Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Authored: Tue Sep 12 11:32:36 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Sep 12 11:32:36 2017 -0700

----------------------------------------------------------------------
 KEYS                                            |  57 +++
 NOTICE                                          |   1 +
 README.md                                       |   2 +-
 bin/check-all.sh                                |   2 +-
 build.gradle                                    |   4 +-
 docs/contribute/tests.md                        |   2 +-
 .../versioned/jobs/configuration-table.html     |  14 +-
 .../versioned/hello-samza-high-level-yarn.md    |   2 +-
 .../versioned/hello-samza-high-level-zk.md      |   2 +-
 .../versioned/samza-rest-getting-started.md     |   2 +-
 docs/startup/download/index.md                  |  17 +-
 docs/startup/hello-samza/versioned/index.md     |   2 +-
 docs/startup/preview/index.md                   |   2 +-
 .../main/java/org/apache/samza/AzureClient.java |  20 +-
 .../main/java/org/apache/samza/AzureConfig.java |  73 ---
 .../main/java/org/apache/samza/BlobUtils.java   | 280 ----------
 .../java/org/apache/samza/JobModelBundle.java   |  61 ---
 .../java/org/apache/samza/LeaseBlobManager.java |  98 ----
 .../java/org/apache/samza/ProcessorEntity.java  |  58 ---
 .../main/java/org/apache/samza/TableUtils.java  | 198 --------
 .../org/apache/samza/config/AzureConfig.java    |  68 +++
 .../coordinator/AzureCoordinationUtils.java     |  58 +++
 .../AzureCoordinationUtilsFactory.java          |  30 ++
 .../samza/coordinator/AzureJobCoordinator.java  | 509 +++++++++++++++++++
 .../coordinator/AzureJobCoordinatorFactory.java |  29 ++
 .../samza/coordinator/AzureLeaderElector.java   | 109 ++++
 .../org/apache/samza/coordinator/AzureLock.java | 100 ++++
 .../samza/coordinator/DistributedLock.java      |  39 ++
 .../samza/coordinator/data/BarrierState.java    |  27 +
 .../samza/coordinator/data/JobModelBundle.java  |  61 +++
 .../samza/coordinator/data/ProcessorEntity.java |  62 +++
 .../scheduler/HeartbeatScheduler.java           |  81 +++
 .../scheduler/JMVersionUpgradeScheduler.java    |  99 ++++
 .../LeaderBarrierCompleteScheduler.java         | 118 +++++
 .../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++
 .../scheduler/LivenessCheckScheduler.java       | 108 ++++
 .../scheduler/RenewLeaseScheduler.java          |  79 +++
 .../scheduler/SchedulerStateChangeListener.java |  29 ++
 .../coordinator/scheduler/TaskScheduler.java    |  35 ++
 .../java/org/apache/samza/util/BlobUtils.java   | 284 +++++++++++
 .../org/apache/samza/util/LeaseBlobManager.java |  99 ++++
 .../java/org/apache/samza/util/TableUtils.java  | 205 ++++++++
 .../samza/config/JobCoordinatorConfig.java      |  21 +
 .../apache/samza/container/LocalityManager.java |   6 +-
 .../coordinator/CoordinationServiceFactory.java |  36 --
 .../samza/coordinator/CoordinationUtils.java    |  14 +-
 .../coordinator/CoordinationUtilsFactory.java   |  47 ++
 .../coordinator/DistributedLockWithState.java   |  42 ++
 .../samza/runtime/LocalApplicationRunner.java   |  91 ++--
 .../org/apache/samza/task/AsyncRunLoop.java     |  14 +
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |   2 +-
 .../org/apache/samza/zk/ZkControllerImpl.java   |  19 +-
 .../samza/zk/ZkCoordinationServiceFactory.java  |  89 ----
 .../apache/samza/zk/ZkCoordinationUtils.java    |  26 +-
 .../samza/zk/ZkCoordinationUtilsFactory.java    |  89 ++++
 .../org/apache/samza/zk/ZkDistributedLock.java  | 117 +++++
 .../samza/zk/ZkJobCoordinatorFactory.java       |  23 +-
 .../org/apache/samza/zk/ZkLeaderElector.java    |   2 +-
 .../org/apache/samza/zk/ZkProcessorLatch.java   |  23 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  41 +-
 .../apache/samza/checkpoint/OffsetManager.scala |  50 +-
 .../apache/samza/container/TaskInstance.scala   |   4 +-
 .../runtime/TestApplicationRunnerMain.java      |   2 +-
 .../runtime/TestLocalApplicationRunner.java     | 185 ++++---
 .../org/apache/samza/task/TestAsyncRunLoop.java |  24 +-
 .../apache/samza/zk/TestZkLeaderElector.java    |   7 +-
 .../org/apache/samza/zk/TestZkNamespace.java    |   8 +-
 .../apache/samza/zk/TestZkProcessorLatch.java   |   2 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   |  57 ++-
 .../samza/checkpoint/TestOffsetManager.scala    |  58 ++-
 .../samza/config/TestJobCoordinatorConfig.java  |  58 +++
 .../samza/container/TestTaskInstance.scala      |  62 ++-
 .../org/apache/samza/rest/SamzaRestService.java |  14 +-
 .../processor/TestZkStreamProcessorSession.java |   3 +-
 .../processor/TestZkLocalApplicationRunner.java |  60 ++-
 .../test/integration/TestStatefulTask.scala     |  79 ++-
 settings.gradle                                 |  17 +
 77 files changed, 3444 insertions(+), 1194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --cc samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 4be4e73,f9c1252..5b2c661
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@@ -19,7 -19,13 +19,14 @@@
  
  package org.apache.samza.runtime;
  
+ import com.google.common.collect.ImmutableList;
+ import java.lang.reflect.Field;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.stream.Collectors;
 +import java.util.Set;
  import org.apache.samza.application.StreamApplication;
  import org.apache.samza.config.ApplicationConfig;
  import org.apache.samza.config.JobConfig;
@@@ -343,8 -324,73 +325,77 @@@ public class TestLocalApplicationRunne
      assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
    }
  
 +  public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) {
 +    return runner.getProcessors();
 +  }
 +
+   /**
+    * A test case to verify if the plan results in different hash if there is change in topological sort order.
+    * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+    */
+   @Test
+   public void testPlanIdWithShuffledStreamSpecs() {
+     List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+         new StreamSpec("test-stream-2", "stream-2", "testStream"),
+         new StreamSpec("test-stream-3", "stream-3", "testStream"));
+     String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+ 
+     List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"),
+         new StreamSpec("test-stream-1", "stream-1", "testStream"),
+         new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ 
+ 
+     assertFalse("Expected both of the latch ids to be different",
+         planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
+   }
+ 
+   /**
+    * A test case to verify if the plan results in same hash in case of same plan.
+    * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+    */
+   @Test
+   public void testGeneratePlanIdWithSameStreamSpecs() {
+     List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+         new StreamSpec("test-stream-2", "stream-2", "testStream"),
+         new StreamSpec("test-stream-3", "stream-3", "testStream"));
+     String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+     String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+ 
+     assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+     assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+   }
+ 
+   /**
+    * A test case to verify plan results in different hash in case of different intermediate stream.
+    * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+    */
+   @Test
+   public void testGeneratePlanIdWithDifferentStreamSpecs() {
+     List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+         new StreamSpec("test-stream-2", "stream-2", "testStream"),
+         new StreamSpec("test-stream-3", "stream-3", "testStream"));
+     String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+ 
+     List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+         new StreamSpec("test-stream-4", "stream-4", "testStream"),
+         new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ 
+ 
+     assertFalse("Expected both of the latch ids to be different",
+         planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
+   }
+ 
+   private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+     String intermediateStreamJson =
+         updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
+ 
+     int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+ 
+     return String.valueOf(planId);
+   }
+ 
+   private String streamSpecToJson(StreamSpec streamSpec) {
+     return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(),
+         streamSpec.getPhysicalName());
+   }
  }

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --cc samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index dcb06d3,4958a57..81f3ed1
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@@ -19,34 -19,16 +19,32 @@@
  
  package org.apache.samza.container
  
 +
 +import java.util
 +import java.util
 +import java.util.Collections
  import java.util.concurrent.ConcurrentHashMap
 +import com.google.common.collect.Multimap
 +import org.apache.samza.SamzaException
+ 
  import org.apache.samza.Partition
 +import org.apache.samza.checkpoint.OffsetManager
 +import org.apache.samza.config.Config
 +import org.apache.samza.config.MapConfig
 +import org.apache.samza.control.ControlMessageUtils
 +import org.apache.samza.job.model.ContainerModel
 +import org.apache.samza.job.model.JobModel
 +import org.apache.samza.job.model.TaskModel
 +import org.apache.samza.metrics.Counter
 +import org.apache.samza.metrics.Metric
 +import org.apache.samza.metrics.MetricsRegistryMap
+ import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+ import org.apache.samza.config.{Config, MapConfig}
+ import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap}
  import org.apache.samza.serializers.SerdeManager
- import org.apache.samza.system.IncomingMessageEnvelope
- import org.apache.samza.system.SystemConsumer
- import org.apache.samza.system.SystemConsumers
- import org.apache.samza.system.SystemProducer
- import org.apache.samza.system.SystemProducers
- import org.apache.samza.system.SystemStream
- import org.apache.samza.system.SystemStreamMetadata
+ import org.apache.samza.storage.TaskStorageManager
  import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
- import org.apache.samza.system.SystemStreamPartition
+ import org.apache.samza.system._
  import org.apache.samza.system.chooser.RoundRobinChooser
  import org.apache.samza.task._
  import org.junit.Assert._
@@@ -365,34 -350,47 +366,77 @@@ class TestTaskInstance 
    }
  
    @Test
+   def testCommitOrder {
+     // Simple objects
+     val partition = new Partition(0)
+     val taskName = new TaskName("taskName")
+     val systemStream = new SystemStream("test-system", "test-stream")
+     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "4").asJava)
+ 
+     // Mocks
+     val collector = Mockito.mock(classOf[TaskInstanceCollector])
+     val storageManager = Mockito.mock(classOf[TaskStorageManager])
+     val offsetManager = Mockito.mock(classOf[OffsetManager])
+     when(offsetManager.buildCheckpoint(any())).thenReturn(checkpoint)
+     val mockOrder = inOrder(offsetManager, collector, storageManager)
+ 
+     val taskInstance: TaskInstance = new TaskInstance(
+       Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+       taskName,
+       new MapConfig,
+       new TaskInstanceMetrics,
+       null,
+       Mockito.mock(classOf[SystemConsumers]),
+       collector,
+       Mockito.mock(classOf[SamzaContainerContext]),
+       offsetManager,
+       storageManager,
+       systemStreamPartitions = Set(systemStreamPartition))
+ 
+     taskInstance.commit
+ 
+     // We must first get a snapshot of the checkpoint so it doesn't change while we flush. SAMZA-1384
+     mockOrder.verify(offsetManager).buildCheckpoint(taskName)
+     // Producers must be flushed next and ideally the output would be flushed before the changelog
+     // s.t. the changelog and checkpoints (state and inputs) are captured last
+     mockOrder.verify(collector).flush
+     // Local state is next, to ensure that the state (particularly the offset file) never points to a newer changelog
+     // offset than what is reflected in the on disk state.
+     mockOrder.verify(storageManager).flush()
+     // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
+     mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
+   }
++
++  @Test
 +  def testBuildInputToTasks = {
 +    val system: String = "test-system"
 +    val stream0: String = "test-stream-0"
 +    val stream1: String = "test-stream-1"
 +
 +    val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0))
 +    val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1))
 +    val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0))
 +
 +    val task0: TaskName = new TaskName("Task 0")
 +    val task1: TaskName = new TaskName("Task 1")
 +    val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition]
 +    ssps.add(ssp0)
 +    ssps.add(ssp2)
 +    val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0))
 +    val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0))
 +    val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1))
 +    val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1))
 +
 +    val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]
 +    cms.put(cm0.getProcessorId, cm0)
 +    cms.put(cm1.getProcessorId, cm1)
 +
 +    val jobModel: JobModel = new JobModel(new MapConfig, cms, null)
 +    val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel)
 +    assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2)
 +    assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1)
 +  }
  }
  
  class MockSystemAdmin extends SystemAdmin {