You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:43 UTC
[05/16] samza git commit: Merge branch 'master' into 0.14.0
Merge branch 'master' into 0.14.0
Conflicts:
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23bfaa8d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23bfaa8d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23bfaa8d
Branch: refs/heads/master
Commit: 23bfaa8d117511ecc1e8a71c0d2695def24158af
Parents: bb3007d bf4c761
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Aug 9 10:39:55 2017 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Aug 9 10:39:55 2017 -0700
----------------------------------------------------------------------
.gitignore | 2 +-
build.gradle | 20 ++
.../versioned/container/metrics-table.html | 8 +-
.../versioned/jobs/configuration-table.html | 32 +-
.../autoscaling/deployer/ConfigManager.java | 2 +-
samza-azure/README.md | 34 +++
.../main/java/org/apache/samza/AzureClient.java | 70 +++++
.../main/java/org/apache/samza/AzureConfig.java | 73 +++++
.../java/org/apache/samza/AzureException.java | 43 +++
.../main/java/org/apache/samza/BlobUtils.java | 280 +++++++++++++++++
.../java/org/apache/samza/JobModelBundle.java | 61 ++++
.../java/org/apache/samza/LeaseBlobManager.java | 98 ++++++
.../java/org/apache/samza/ProcessorEntity.java | 58 ++++
.../main/java/org/apache/samza/TableUtils.java | 198 ++++++++++++
.../clustermanager/ContainerProcessManager.java | 145 +++++----
.../clustermanager/SamzaApplicationState.java | 8 +
.../org/apache/samza/config/TaskConfigJava.java | 4 +-
.../grouper/stream/GroupByPartition.java | 2 +-
.../grouper/task/GroupByContainerIds.java | 18 +-
.../apache/samza/operators/StreamGraphImpl.java | 4 +-
.../samza/operators/impl/OperatorImpl.java | 4 +-
.../samza/operators/impl/OperatorImplGraph.java | 5 +-
.../apache/samza/processor/StreamProcessor.java | 103 ++++---
.../org/apache/samza/task/TaskFactoryUtil.java | 5 +-
.../java/org/apache/samza/zk/ProcessorData.java | 18 +-
.../samza/zk/ScheduleAfterDebounceTime.java | 8 +
.../samza/zk/ZkBarrierForVersionUpgrade.java | 33 +-
.../org/apache/samza/zk/ZkControllerImpl.java | 42 ++-
.../samza/zk/ZkCoordinationServiceFactory.java | 6 +-
.../apache/samza/zk/ZkCoordinationUtils.java | 12 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 150 +++++++---
.../samza/zk/ZkJobCoordinatorFactory.java | 22 +-
.../samza/zk/ZkJobCoordinatorMetrics.java | 9 -
.../org/apache/samza/zk/ZkLeaderElector.java | 19 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 300 +++++++++++++++++--
.../org/apache/samza/zk/ZkUtilsMetrics.java | 56 ++++
.../org/apache/samza/config/JobConfig.scala | 2 +-
.../apache/samza/container/SamzaContainer.scala | 4 +-
.../ContainerProcessManagerMetrics.scala | 1 +
.../samza/serializers/ByteBufferSerde.scala | 5 +-
.../apache/samza/system/SystemConsumers.scala | 2 +-
.../clustermanager/MockContainerAllocator.java | 23 ++
.../TestContainerProcessManager.java | 207 +++++++++++--
.../grouper/task/TestGroupByContainerIds.java | 31 +-
.../samza/operators/TestJoinOperator.java | 2 +-
.../operators/impl/TestOperatorImplGraph.java | 25 +-
.../apache/samza/task/TestTaskFactoryUtil.java | 11 +
.../zk/TestZkBarrierForVersionUpgrade.java | 4 +-
.../apache/samza/zk/TestZkJobCoordinator.java | 49 +++
.../apache/samza/zk/TestZkLeaderElector.java | 2 +-
.../apache/samza/zk/TestZkProcessorLatch.java | 4 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 148 ++++++++-
.../samza/serializers/TestByteBufferSerde.scala | 24 +-
.../hdfs/reader/TestMultiFileHdfsReader.java | 6 +-
.../system/kafka/KafkaSystemConsumer.scala | 2 +-
.../storage/kv/KeyValueStorageEngine.scala | 2 +-
.../samza/rest/proxy/job/ScriptJobProxy.java | 1 +
.../apache/samza/rest/script/ScriptRunner.java | 29 +-
.../samza/processor/TestZkStreamProcessor.java | 251 ++++++++++++++++
.../processor/TestZkStreamProcessorBase.java | 88 ++++--
.../TestZkStreamProcessorFailures.java | 151 ++++++++++
.../processor/TestZkStreamProcessorSession.java | 135 +++++++++
.../processor/TestZkLocalApplicationRunner.java | 242 ++++++++++-----
.../test/processor/TestZkStreamProcessor.java | 248 ---------------
.../TestZkStreamProcessorFailures.java | 147 ---------
settings.gradle | 3 +-
66 files changed, 2975 insertions(+), 826 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --cc samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 653c0bb,415111f..70be208
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@@ -67,10 -69,10 +68,10 @@@ public class StreamProcessor
private volatile SamzaContainer container = null;
private volatile Throwable containerException = null;
--
++
// Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
// stopped due to re-balancing
- private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1);
+ /* package private */volatile CountDownLatch jcContainerShutdownLatch;
private volatile boolean processorOnStartCalled = false;
@VisibleForTesting
@@@ -233,62 -244,59 +243,57 @@@
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
- if (!jobModel.getContainers().containsKey(processorId)) {
- LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
- stop();
- } else {
- jcContainerShutdownLatch = new CountDownLatch(1);
-
- SamzaContainerListener containerListener = new SamzaContainerListener() {
- @Override
- public void onContainerStart() {
- if (!processorOnStartCalled) {
- // processorListener is called on start only the first time the container starts.
- // It is not called after every re-balance of partitions among the processors
- processorOnStartCalled = true;
- if (processorListener != null) {
- processorListener.onStart();
- }
- } else {
- LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
- }
- }
+ jcContainerShutdownLatch = new CountDownLatch(1);
- @Override
- public void onContainerStop(boolean pauseByJm) {
- if (pauseByJm) {
- LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- }
- } else { // sp.stop was called or container stopped by itself
- LOGGER.info("Container " + container.toString() + " stopped.");
- container = null; // this guarantees that stop() doesn't try to stop container again
- stop();
+ SamzaContainerListener containerListener = new SamzaContainerListener() {
+ @Override
+ public void onContainerStart() {
+ if (!processorOnStartCalled) {
+ // processorListener is called on start only the first time the container starts.
+ // It is not called after every re-balance of partitions among the processors
+ processorOnStartCalled = true;
+ if (processorListener != null) {
+ processorListener.onStart();
}
+ } else {
+ LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
}
+ }
- @Override
- public void onContainerFailed(Throwable t) {
+ @Override
+ public void onContainerStop(boolean pauseByJm) {
+ if (pauseByJm) {
+ LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
if (jcContainerShutdownLatch != null) {
jcContainerShutdownLatch.countDown();
- } else {
- LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
}
- containerException = t;
- LOGGER.error("Container failed. Stopping the processor.", containerException);
- container = null;
+ } else { // sp.stop was called or container stopped by itself
+ LOGGER.info("Container " + container.toString() + " stopped.");
+ container = null; // this guarantees that stop() doesn't try to stop container again
stop();
}
- };
+ }
- container = createSamzaContainer(processorId, jobModel);
- container.setContainerListener(containerListener);
- LOGGER.info("Starting container " + container.toString());
- executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("p-" + processorId + "-container-thread-%d").build());
- executorService.submit(container::run);
- }
+ @Override
+ public void onContainerFailed(Throwable t) {
+ if (jcContainerShutdownLatch != null) {
+ jcContainerShutdownLatch.countDown();
+ } else {
+ LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+ }
+ containerException = t;
+ LOGGER.error("Container failed. Stopping the processor.", containerException);
+ container = null;
+ stop();
+ }
+ };
+
- container = createSamzaContainer(
- jobModel.getContainers().get(processorId),
- jobModel.maxChangeLogStreamPartitions);
++ container = createSamzaContainer(processorId, jobModel);
+ container.setContainerListener(containerListener);
+ LOGGER.info("Starting container " + container.toString());
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+ executorService.submit(container::run);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------