You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:45 UTC
[07/16] samza git commit: Merge branch 'master' into 0.14.0
Merge branch 'master' into 0.14.0
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1701ea84
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1701ea84
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1701ea84
Branch: refs/heads/master
Commit: 1701ea84a2e029b3297687b5fc814998371b1a6f
Parents: 79200c7 fb39a51
Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Authored: Tue Sep 12 11:32:36 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Sep 12 11:32:36 2017 -0700
----------------------------------------------------------------------
KEYS | 57 +++
NOTICE | 1 +
README.md | 2 +-
bin/check-all.sh | 2 +-
build.gradle | 4 +-
docs/contribute/tests.md | 2 +-
.../versioned/jobs/configuration-table.html | 14 +-
.../versioned/hello-samza-high-level-yarn.md | 2 +-
.../versioned/hello-samza-high-level-zk.md | 2 +-
.../versioned/samza-rest-getting-started.md | 2 +-
docs/startup/download/index.md | 17 +-
docs/startup/hello-samza/versioned/index.md | 2 +-
docs/startup/preview/index.md | 2 +-
.../main/java/org/apache/samza/AzureClient.java | 20 +-
.../main/java/org/apache/samza/AzureConfig.java | 73 ---
.../main/java/org/apache/samza/BlobUtils.java | 280 ----------
.../java/org/apache/samza/JobModelBundle.java | 61 ---
.../java/org/apache/samza/LeaseBlobManager.java | 98 ----
.../java/org/apache/samza/ProcessorEntity.java | 58 ---
.../main/java/org/apache/samza/TableUtils.java | 198 --------
.../org/apache/samza/config/AzureConfig.java | 68 +++
.../coordinator/AzureCoordinationUtils.java | 58 +++
.../AzureCoordinationUtilsFactory.java | 30 ++
.../samza/coordinator/AzureJobCoordinator.java | 509 +++++++++++++++++++
.../coordinator/AzureJobCoordinatorFactory.java | 29 ++
.../samza/coordinator/AzureLeaderElector.java | 109 ++++
.../org/apache/samza/coordinator/AzureLock.java | 100 ++++
.../samza/coordinator/DistributedLock.java | 39 ++
.../samza/coordinator/data/BarrierState.java | 27 +
.../samza/coordinator/data/JobModelBundle.java | 61 +++
.../samza/coordinator/data/ProcessorEntity.java | 62 +++
.../scheduler/HeartbeatScheduler.java | 81 +++
.../scheduler/JMVersionUpgradeScheduler.java | 99 ++++
.../LeaderBarrierCompleteScheduler.java | 118 +++++
.../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++
.../scheduler/LivenessCheckScheduler.java | 108 ++++
.../scheduler/RenewLeaseScheduler.java | 79 +++
.../scheduler/SchedulerStateChangeListener.java | 29 ++
.../coordinator/scheduler/TaskScheduler.java | 35 ++
.../java/org/apache/samza/util/BlobUtils.java | 284 +++++++++++
.../org/apache/samza/util/LeaseBlobManager.java | 99 ++++
.../java/org/apache/samza/util/TableUtils.java | 205 ++++++++
.../samza/config/JobCoordinatorConfig.java | 21 +
.../apache/samza/container/LocalityManager.java | 6 +-
.../coordinator/CoordinationServiceFactory.java | 36 --
.../samza/coordinator/CoordinationUtils.java | 14 +-
.../coordinator/CoordinationUtilsFactory.java | 47 ++
.../coordinator/DistributedLockWithState.java | 42 ++
.../samza/runtime/LocalApplicationRunner.java | 91 ++--
.../org/apache/samza/task/AsyncRunLoop.java | 14 +
.../samza/zk/ZkBarrierForVersionUpgrade.java | 2 +-
.../org/apache/samza/zk/ZkControllerImpl.java | 19 +-
.../samza/zk/ZkCoordinationServiceFactory.java | 89 ----
.../apache/samza/zk/ZkCoordinationUtils.java | 26 +-
.../samza/zk/ZkCoordinationUtilsFactory.java | 89 ++++
.../org/apache/samza/zk/ZkDistributedLock.java | 117 +++++
.../samza/zk/ZkJobCoordinatorFactory.java | 23 +-
.../org/apache/samza/zk/ZkLeaderElector.java | 2 +-
.../org/apache/samza/zk/ZkProcessorLatch.java | 23 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 41 +-
.../apache/samza/checkpoint/OffsetManager.scala | 50 +-
.../apache/samza/container/TaskInstance.scala | 4 +-
.../runtime/TestApplicationRunnerMain.java | 2 +-
.../runtime/TestLocalApplicationRunner.java | 185 ++++---
.../org/apache/samza/task/TestAsyncRunLoop.java | 24 +-
.../apache/samza/zk/TestZkLeaderElector.java | 7 +-
.../org/apache/samza/zk/TestZkNamespace.java | 8 +-
.../apache/samza/zk/TestZkProcessorLatch.java | 2 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 57 ++-
.../samza/checkpoint/TestOffsetManager.scala | 58 ++-
.../samza/config/TestJobCoordinatorConfig.java | 58 +++
.../samza/container/TestTaskInstance.scala | 62 ++-
.../org/apache/samza/rest/SamzaRestService.java | 14 +-
.../processor/TestZkStreamProcessorSession.java | 3 +-
.../processor/TestZkLocalApplicationRunner.java | 60 ++-
.../test/integration/TestStatefulTask.scala | 79 ++-
settings.gradle | 17 +
77 files changed, 3444 insertions(+), 1194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --cc samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 4be4e73,f9c1252..5b2c661
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@@ -19,7 -19,13 +19,14 @@@
package org.apache.samza.runtime;
+ import com.google.common.collect.ImmutableList;
+ import java.lang.reflect.Field;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.stream.Collectors;
+import java.util.Set;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
@@@ -343,8 -324,73 +325,77 @@@ public class TestLocalApplicationRunne
assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
}
+ public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) {
+ return runner.getProcessors();
+ }
+
+ /**
+ * A test case to verify if the plan results in different hash if there is change in topological sort order.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testPlanIdWithShuffledStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+ assertFalse("Expected both of the latch ids to be different",
+ planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
+ }
+
+ /**
+ * A test case to verify if the plan results in same hash in case of same plan.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithSameStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+ String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+
+ assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+ assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+ }
+
+ /**
+ * A test case to verify plan results in different hash in case of different intermediate stream.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithDifferentStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-4", "stream-4", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+ assertFalse("Expected both of the latch ids to be different",
+ planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
+ }
+
+ private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+ String intermediateStreamJson =
+ updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
+
+ int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+
+ return String.valueOf(planId);
+ }
+
+ private String streamSpecToJson(StreamSpec streamSpec) {
+ return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(),
+ streamSpec.getPhysicalName());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1701ea84/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --cc samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index dcb06d3,4958a57..81f3ed1
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@@ -19,34 -19,16 +19,32 @@@
package org.apache.samza.container
+
+import java.util
+import java.util
+import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
+import com.google.common.collect.Multimap
+import org.apache.samza.SamzaException
+
import org.apache.samza.Partition
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.Config
+import org.apache.samza.config.MapConfig
+import org.apache.samza.control.ControlMessageUtils
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Metric
+import org.apache.samza.metrics.MetricsRegistryMap
+ import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+ import org.apache.samza.config.{Config, MapConfig}
+ import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap}
import org.apache.samza.serializers.SerdeManager
- import org.apache.samza.system.IncomingMessageEnvelope
- import org.apache.samza.system.SystemConsumer
- import org.apache.samza.system.SystemConsumers
- import org.apache.samza.system.SystemProducer
- import org.apache.samza.system.SystemProducers
- import org.apache.samza.system.SystemStream
- import org.apache.samza.system.SystemStreamMetadata
+ import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
- import org.apache.samza.system.SystemStreamPartition
+ import org.apache.samza.system._
import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.task._
import org.junit.Assert._
@@@ -365,34 -350,47 +366,77 @@@ class TestTaskInstance
}
@Test
+ def testCommitOrder {
+ // Simple objects
+ val partition = new Partition(0)
+ val taskName = new TaskName("taskName")
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val checkpoint = new Checkpoint(Map(systemStreamPartition -> "4").asJava)
+
+ // Mocks
+ val collector = Mockito.mock(classOf[TaskInstanceCollector])
+ val storageManager = Mockito.mock(classOf[TaskStorageManager])
+ val offsetManager = Mockito.mock(classOf[OffsetManager])
+ when(offsetManager.buildCheckpoint(any())).thenReturn(checkpoint)
+ val mockOrder = inOrder(offsetManager, collector, storageManager)
+
+ val taskInstance: TaskInstance = new TaskInstance(
+ Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+ taskName,
+ new MapConfig,
+ new TaskInstanceMetrics,
+ null,
+ Mockito.mock(classOf[SystemConsumers]),
+ collector,
+ Mockito.mock(classOf[SamzaContainerContext]),
+ offsetManager,
+ storageManager,
+ systemStreamPartitions = Set(systemStreamPartition))
+
+ taskInstance.commit
+
+ // We must first get a snapshot of the checkpoint so it doesn't change while we flush. SAMZA-1384
+ mockOrder.verify(offsetManager).buildCheckpoint(taskName)
+ // Producers must be flushed next and ideally the output would be flushed before the changelog
+ // s.t. the changelog and checkpoints (state and inputs) are captured last
+ mockOrder.verify(collector).flush
+ // Local state is next, to ensure that the state (particularly the offset file) never points to a newer changelog
+ // offset than what is reflected in the on disk state.
+ mockOrder.verify(storageManager).flush()
+ // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
+ mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
+ }
++
++ @Test
+ def testBuildInputToTasks = {
+ val system: String = "test-system"
+ val stream0: String = "test-stream-0"
+ val stream1: String = "test-stream-1"
+
+ val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0))
+ val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1))
+ val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0))
+
+ val task0: TaskName = new TaskName("Task 0")
+ val task1: TaskName = new TaskName("Task 1")
+ val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition]
+ ssps.add(ssp0)
+ ssps.add(ssp2)
+ val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0))
+ val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0))
+ val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1))
+ val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1))
+
+ val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]
+ cms.put(cm0.getProcessorId, cm0)
+ cms.put(cm1.getProcessorId, cm1)
+
+ val jobModel: JobModel = new JobModel(new MapConfig, cms, null)
+ val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel)
+ assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2)
+ assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1)
+ }
}
class MockSystemAdmin extends SystemAdmin {