You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:43 UTC

[05/16] samza git commit: Merge branch 'master' into 0.14.0

Merge branch 'master' into 0.14.0

Conflicts:
	samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23bfaa8d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23bfaa8d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23bfaa8d

Branch: refs/heads/master
Commit: 23bfaa8d117511ecc1e8a71c0d2695def24158af
Parents: bb3007d bf4c761
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Wed Aug 9 10:39:55 2017 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Aug 9 10:39:55 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 build.gradle                                    |  20 ++
 .../versioned/container/metrics-table.html      |   8 +-
 .../versioned/jobs/configuration-table.html     |  32 +-
 .../autoscaling/deployer/ConfigManager.java     |   2 +-
 samza-azure/README.md                           |  34 +++
 .../main/java/org/apache/samza/AzureClient.java |  70 +++++
 .../main/java/org/apache/samza/AzureConfig.java |  73 +++++
 .../java/org/apache/samza/AzureException.java   |  43 +++
 .../main/java/org/apache/samza/BlobUtils.java   | 280 +++++++++++++++++
 .../java/org/apache/samza/JobModelBundle.java   |  61 ++++
 .../java/org/apache/samza/LeaseBlobManager.java |  98 ++++++
 .../java/org/apache/samza/ProcessorEntity.java  |  58 ++++
 .../main/java/org/apache/samza/TableUtils.java  | 198 ++++++++++++
 .../clustermanager/ContainerProcessManager.java | 145 +++++----
 .../clustermanager/SamzaApplicationState.java   |   8 +
 .../org/apache/samza/config/TaskConfigJava.java |   4 +-
 .../grouper/stream/GroupByPartition.java        |   2 +-
 .../grouper/task/GroupByContainerIds.java       |  18 +-
 .../apache/samza/operators/StreamGraphImpl.java |   4 +-
 .../samza/operators/impl/OperatorImpl.java      |   4 +-
 .../samza/operators/impl/OperatorImplGraph.java |   5 +-
 .../apache/samza/processor/StreamProcessor.java | 103 ++++---
 .../org/apache/samza/task/TaskFactoryUtil.java  |   5 +-
 .../java/org/apache/samza/zk/ProcessorData.java |  18 +-
 .../samza/zk/ScheduleAfterDebounceTime.java     |   8 +
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |  33 +-
 .../org/apache/samza/zk/ZkControllerImpl.java   |  42 ++-
 .../samza/zk/ZkCoordinationServiceFactory.java  |   6 +-
 .../apache/samza/zk/ZkCoordinationUtils.java    |  12 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 150 +++++++---
 .../samza/zk/ZkJobCoordinatorFactory.java       |  22 +-
 .../samza/zk/ZkJobCoordinatorMetrics.java       |   9 -
 .../org/apache/samza/zk/ZkLeaderElector.java    |  19 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 300 +++++++++++++++++--
 .../org/apache/samza/zk/ZkUtilsMetrics.java     |  56 ++++
 .../org/apache/samza/config/JobConfig.scala     |   2 +-
 .../apache/samza/container/SamzaContainer.scala |   4 +-
 .../ContainerProcessManagerMetrics.scala        |   1 +
 .../samza/serializers/ByteBufferSerde.scala     |   5 +-
 .../apache/samza/system/SystemConsumers.scala   |   2 +-
 .../clustermanager/MockContainerAllocator.java  |  23 ++
 .../TestContainerProcessManager.java            | 207 +++++++++++--
 .../grouper/task/TestGroupByContainerIds.java   |  31 +-
 .../samza/operators/TestJoinOperator.java       |   2 +-
 .../operators/impl/TestOperatorImplGraph.java   |  25 +-
 .../apache/samza/task/TestTaskFactoryUtil.java  |  11 +
 .../zk/TestZkBarrierForVersionUpgrade.java      |   4 +-
 .../apache/samza/zk/TestZkJobCoordinator.java   |  49 +++
 .../apache/samza/zk/TestZkLeaderElector.java    |   2 +-
 .../apache/samza/zk/TestZkProcessorLatch.java   |   4 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   | 148 ++++++++-
 .../samza/serializers/TestByteBufferSerde.scala |  24 +-
 .../hdfs/reader/TestMultiFileHdfsReader.java    |   6 +-
 .../system/kafka/KafkaSystemConsumer.scala      |   2 +-
 .../storage/kv/KeyValueStorageEngine.scala      |   2 +-
 .../samza/rest/proxy/job/ScriptJobProxy.java    |   1 +
 .../apache/samza/rest/script/ScriptRunner.java  |  29 +-
 .../samza/processor/TestZkStreamProcessor.java  | 251 ++++++++++++++++
 .../processor/TestZkStreamProcessorBase.java    |  88 ++++--
 .../TestZkStreamProcessorFailures.java          | 151 ++++++++++
 .../processor/TestZkStreamProcessorSession.java | 135 +++++++++
 .../processor/TestZkLocalApplicationRunner.java | 242 ++++++++++-----
 .../test/processor/TestZkStreamProcessor.java   | 248 ---------------
 .../TestZkStreamProcessorFailures.java          | 147 ---------
 settings.gradle                                 |   3 +-
 66 files changed, 2975 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --cc samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 653c0bb,415111f..70be208
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@@ -67,10 -69,10 +68,10 @@@ public class StreamProcessor 
  
    private volatile SamzaContainer container = null;
    private volatile Throwable containerException = null;
--  
++
    // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
    // stopped due to re-balancing
-   private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1);
+   /* package private */volatile CountDownLatch jcContainerShutdownLatch;
    private volatile boolean processorOnStartCalled = false;
  
    @VisibleForTesting
@@@ -233,62 -244,59 +243,57 @@@
  
        @Override
        public void onNewJobModel(String processorId, JobModel jobModel) {
-         if (!jobModel.getContainers().containsKey(processorId)) {
-           LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
-           stop();
-         } else {
-           jcContainerShutdownLatch = new CountDownLatch(1);
- 
-           SamzaContainerListener containerListener = new SamzaContainerListener() {
-             @Override
-             public void onContainerStart() {
-               if (!processorOnStartCalled) {
-                 // processorListener is called on start only the first time the container starts.
-                 // It is not called after every re-balance of partitions among the processors
-                 processorOnStartCalled = true;
-                 if (processorListener != null) {
-                   processorListener.onStart();
-                 }
-               } else {
-                 LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
-               }
-             }
+         jcContainerShutdownLatch = new CountDownLatch(1);
  
-             @Override
-             public void onContainerStop(boolean pauseByJm) {
-               if (pauseByJm) {
-                 LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
-                 if (jcContainerShutdownLatch != null) {
-                   jcContainerShutdownLatch.countDown();
-                 }
-               } else {  // sp.stop was called or container stopped by itself
-                 LOGGER.info("Container " + container.toString() + " stopped.");
-                 container = null; // this guarantees that stop() doesn't try to stop container again
-                 stop();
+         SamzaContainerListener containerListener = new SamzaContainerListener() {
+           @Override
+           public void onContainerStart() {
+             if (!processorOnStartCalled) {
+               // processorListener is called on start only the first time the container starts.
+               // It is not called after every re-balance of partitions among the processors
+               processorOnStartCalled = true;
+               if (processorListener != null) {
+                 processorListener.onStart();
                }
+             } else {
+               LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
              }
+           }
  
-             @Override
-             public void onContainerFailed(Throwable t) {
+           @Override
+           public void onContainerStop(boolean pauseByJm) {
+             if (pauseByJm) {
+               LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
                if (jcContainerShutdownLatch != null) {
                  jcContainerShutdownLatch.countDown();
-               } else {
-                 LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
                }
-               containerException = t;
-               LOGGER.error("Container failed. Stopping the processor.", containerException);
-               container = null;
+             } else {  // sp.stop was called or container stopped by itself
+               LOGGER.info("Container " + container.toString() + " stopped.");
+               container = null; // this guarantees that stop() doesn't try to stop container again
                stop();
              }
-           };
+           }
  
-           container = createSamzaContainer(processorId, jobModel);
-           container.setContainerListener(containerListener);
-           LOGGER.info("Starting container " + container.toString());
-           executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-               .setNameFormat("p-" + processorId + "-container-thread-%d").build());
-           executorService.submit(container::run);
-         }
+           @Override
+           public void onContainerFailed(Throwable t) {
+             if (jcContainerShutdownLatch != null) {
+               jcContainerShutdownLatch.countDown();
+             } else {
+               LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+             }
+             containerException = t;
+             LOGGER.error("Container failed. Stopping the processor.", containerException);
+             container = null;
+             stop();
+           }
+         };
+ 
 -        container = createSamzaContainer(
 -            jobModel.getContainers().get(processorId),
 -            jobModel.maxChangeLogStreamPartitions);
++        container = createSamzaContainer(processorId, jobModel);
+         container.setContainerListener(containerListener);
+         LOGGER.info("Starting container " + container.toString());
+         executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+             .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+         executorService.submit(container::run);
        }
  
        @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/23bfaa8d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------