You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/11/27 09:48:33 UTC

[26/50] samza git commit: fix deployment models

fix deployment models


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/424b042d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/424b042d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/424b042d

Branch: refs/heads/master
Commit: 424b042dd4e394c4a80ddfdfa4905519a87b3038
Parents: 456e697
Author: Jagadish <jv...@linkedin.com>
Authored: Wed Oct 24 14:34:03 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 24 14:34:03 2018 -0700

----------------------------------------------------------------------
 .../versioned/deployment/deployment-model.md    | 36 +++++++-------------
 docs/learn/documentation/versioned/index.html   |  2 +-
 .../task/SingleContainerGrouperFactory.java     | 13 +++++--
 .../standalone/PassthroughJobCoordinator.java   |  9 ++++-
 .../EndOfStreamIntegrationTest.java             |  2 +-
 .../WatermarkIntegrationTest.java               |  2 +-
 6 files changed, 35 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/docs/learn/documentation/versioned/deployment/deployment-model.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/deployment/deployment-model.md b/docs/learn/documentation/versioned/deployment/deployment-model.md
index 9192278..81de2eb 100644
--- a/docs/learn/documentation/versioned/deployment/deployment-model.md
+++ b/docs/learn/documentation/versioned/deployment/deployment-model.md
@@ -19,40 +19,30 @@ title: Deployment model
    limitations under the License.
 -->
 
-# Overview
-One unique thing about Samza is that it provides multiple ways to deploy an application. Each deployment model comes with its own benefits, so you have flexibility in being able to choose which model best fits your needs. Samza supports “write once, run anywhere”, so application logic is the same regardless of the deployment model that you choose.
+### Overview
+One unique thing about Samza is that it provides multiple ways to deploy an application. Each deployment model comes with its own benefits, so you have flexibility in choosing which model best fits your needs. Samza supports “write once, run anywhere”, so application logic is the same regardless of where you choose to deploy your application.
 
-## YARN
-Apache YARN is a technology that manages resources, deploys applications, and monitors applications for a cluster of machines. Samza submits an application to YARN, and YARN assigns resources from across the cluster to that application. Multiple applications can run on a single YARN cluster.
+### Running Samza on YARN
+Samza integrates with Apache YARN for running stream-processing as a managed service. Samza leverages YARN for multi-tenancy, resource-management, isolation and deployment for your applications. In this mode, you write your Samza application and submit it to be scheduled on a YARN cluster. You also specify its resource requirements - the number of containers needed, number of cores and memory per-container. Samza then works with YARN to provision resources for your application and run it across a cluster of machines. It also handles failures of individual instances and restarts them.
 
-* Provides central cluster management
-* Each application has an associated application master in YARN to coordinate processing containers
-* Enforces CPU and memory limits
-* Supports multi-tenancy for applications
-* A Samza application is run directly as its own set of processes
-* Automatically restarts containers that have failed
-* Provides centrally managed tools and dashboards
+When multiple applications share the same YARN cluster, they need to be isolated from each other. For this purpose, Samza works with YARN to enforce cpu and memory limits. Any application that uses more than its requested share of memory or cpu is terminated - thereby, enabling multi-tenancy. Just like you would for any YARN-based application, you can use YARN's web UI to manage your Samza jobs, view their logs etc.
 
-## Standalone
+### Running Samza in standalone mode
 
-In standalone mode, a Samza application is a library embedded into another application (similar to Kafka Streams). This means that an application owner can control the full lifecycle of the application. Samza will do the coordination between processing containers to ensure that processing is balanced and failures are handled.
+Often you want to embed Samza as a component in a larger application. To enable this, Samza supports a standalone mode of operation. In this mode, Samza can be used like a library within your application. This is very similar to Kafka Streams and offers greater control over the application life-cycle. You can increase capacity by spinning up multiple instances. The instances will dynamically coordinate among themselves to distribute work. If any instance fails, the tasks running on it will be re-assigned to the remaining ones. By default, Samza uses Zookeeper for coordination across instances. The coordination logic by itself is pluggable.
 
-* Application owner is free to control cluster management, CPU and memory limits, and multi-tenancy
-* Container coordination is done by Zookeeper out of the box, and container coordination can be extended to be done by a technology other than Zookeeper
-* If containers fail, then partitions will be rebalanced across remaining containers
-* Samza logic can run within the same process as non-Samza logic
-* Application owner can run tools and dashboards wherever the application is deployed
+This mode allows you to run Samza with any cluster-manager of your choice - including Kubernetes, Marathon or on any hosting environment. You are free to control memory-limits, multi-tenancy for your application on your own - since Samza now acts as a light-weight library used by your application. 
 
-# Choosing a deployment model
+### Choosing a deployment model
 
-Here are some guidelines when choosing your deployment model.
+A common question that we get asked is - "Where should I run my Samza application?". Here are some guidelines when choosing your deployment model. Since your application logic does not change, it is easy to port from one deployment model to the other.
 
-* Would you like your Samza application to be embedded as a component of a larger application?
+* Would you like Samza to be embedded as a component of a larger application?
     * If so, then you should use standalone.
 * Would you like to have out-of-the-box resource management (e.g. CPU/memory limits, restarts on failures)?
     * If so, then you should use YARN.
-* Would you like to have the freedom to deploy and run your application anywhere?
+* Would you like to run your application on any other cluster manager - eg: Kubernetes?
     * If so, then you should use standalone.
 * Would you like to run centrally-managed tools and dashboards?
     * If so, then you should use YARN.
-    * Note: You can still have tools and dashboards when using standalone, but you will need to run them yourself wherever you have actually deployed your application.
+    * Note: You can still have tools and dashboards when using standalone, but you will need to run them yourself wherever your application is deployed.

http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index 50bfd2d..893e428 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -21,7 +21,7 @@ title: Documentation
 
 <h4><a href="core-concepts/core-concepts.html">CORE CONCEPTS</a></h4>
 <h4><a href="architecture/architecture-overview.html">ARCHITECTURE</a></h4>
-<h4><a href="jobs/configuration.html">CONFIGURATIONS</a></h4>
+<h4><a href="jobs/samza-configurations.html">CONFIGURATIONS</a></h4>
 
 <h4>API</h4>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index ee962d5..90c031a 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.container.grouper.task;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.TaskName;
@@ -27,6 +29,7 @@ import org.apache.samza.job.model.TaskModel;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -41,16 +44,22 @@ class SingleContainerGrouper implements TaskNameGrouper {
   private final String containerId;
 
   SingleContainerGrouper(String containerId) {
-    this.containerId = containerId;
+    this.containerId = null;
   }
 
   @Override
   public Set<ContainerModel> group(Set<TaskModel> taskModels) {
+    return group(taskModels, ImmutableList.of(this.containerId));
+  }
+
+  @Override
+  public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
+    Preconditions.checkState(containersIds.size() == 1);
     Map<TaskName, TaskModel> taskNameTaskModelMap = new HashMap<>();
     for (TaskModel taskModel: taskModels) {
       taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
     }
-    ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
+    ContainerModel containerModel = new ContainerModel(containersIds.get(0), taskNameTaskModelMap);
     return Collections.singleton(containerModel);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 737ac3e..bcf2085 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.samza.standalone;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
@@ -69,7 +72,7 @@ public class PassthroughJobCoordinator implements JobCoordinator {
 
   public PassthroughJobCoordinator(Config config) {
     this.processorId = createProcessorId(config);
-    this.config = config;
+    this.config = new MapConfig(config, ImmutableMap.of(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"));
   }
 
   @Override
@@ -119,7 +122,11 @@ public class PassthroughJobCoordinator implements JobCoordinator {
     SystemAdmins systemAdmins = new SystemAdmins(config);
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
     systemAdmins.start();
+<<<<<<< Updated upstream
     String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
+=======
+    String containerId = this.processorId;
+>>>>>>> Stashed changes
 
     /** TODO:
      Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,

http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 6f381e2..f967672 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -78,7 +78,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
     configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
 
     configs.put(JobConfig.JOB_NAME(), "test-eos-job");
-    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    //configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/424b042d/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 74c32b4..088278d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -131,7 +131,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
     configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
 
     configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
-    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    //configs.put(JobConfig.PROCESSOR_ID(), "hello");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());