You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/22 16:36:36 UTC

[samza] branch master updated: SAMZA-2172: Make High Level applications respect job.container.thread.pool.size

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c087f80  SAMZA-2172: Make High Level applications respect job.container.thread.pool.size
c087f80 is described below

commit c087f80cfe2ae576f30585894a5d907f7c5b2cee
Author: mynameborat <bh...@gmail.com>
AuthorDate: Wed May 22 09:36:28 2019 -0700

    SAMZA-2172: Make High Level applications respect job.container.thread.pool.size
    
    Refer https://issues.apache.org/jira/browse/SAMZA-2172 for more details.
    
    `2019-04-25 14:41:17.132 [Samza Container Thread-2] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-2
    2019-04-25 14:41:17.133 [Samza Container Thread-2] WikipediaAsyncApplication [INFO] Finished executing blocking filter on the thread Samza Container Thread-2
    2019-04-25 14:41:17.442 [Samza Container Thread-3] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-3
    2019-04-25 14:41:17.442 [Samza Container Thread-3] WikipediaAsyncApplication [INFO] Finished executing blocking filter on the thread Samza Container Thread-3
    2019-04-25 14:41:17.487 [Samza Container Thread-0] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-0
    `
    
    Author: mynameborat <bh...@gmail.com>
    
    Reviewers: Prateek Maheshwari <pm...@apache.org>
    
    Closes #1011 from mynameborat/async-api-fix
---
 .../org/apache/samza/task/StreamOperatorTask.java  | 99 ++++++++++++++--------
 .../samza/task/StreamOperatorTaskFactory.java      | 30 +++++++
 .../org/apache/samza/task/TaskFactoryUtil.java     | 28 +++++-
 .../apache/samza/container/SamzaContainer.scala    |  1 -
 .../apache/samza/task/TestStreamOperatorTask.java  | 34 +++++++-
 .../org/apache/samza/task/TestTaskFactoryUtil.java | 21 +++--
 6 files changed, 166 insertions(+), 47 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 387ad2e..a0ec7b8 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,6 +21,7 @@ package org.apache.samza.task;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.OperatorSpecGraph;
@@ -47,6 +48,14 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
   private final OperatorSpecGraph specGraph;
   private final Clock clock;
 
+  /*
+   * Thread pool used by the task to schedule processing of incoming messages. If job.container.thread.pool.size is
+   * not configured, this will be null. We don't want to create an executor service within StreamOperatorTask due to
+   * following reasons
+   *   1. It is harder to reason about the lifecycle of the executor service
+   *   2. We end up with thread pool proliferation. Especially for jobs with high number of tasks.
+   */
+  private ExecutorService taskThreadPool;
   private OperatorImplGraph operatorImplGraph;
 
   /**
@@ -86,7 +95,9 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
 
   /**
    * Passes the incoming message envelopes along to the {@link InputOperatorImpl} node
-   * for the input {@link SystemStream}.
+   * for the input {@link SystemStream}. It is non-blocking and dispatches the message to the container thread
+   * pool. The thread pool size is configured through job.container.thread.pool.size. In the absence of the config,
+   * the task executes the DAG on the run loop thread.
    * <p>
    * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
    * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
@@ -99,40 +110,55 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
   @Override
   public final void processAsync(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator,
       TaskCallback callback) {
-    SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
-    InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
-    if (inputOpImpl != null) {
-      CompletionStage<Void> processFuture;
-      MessageType messageType = MessageType.of(ime.getMessage());
-      switch (messageType) {
-        case USER_MESSAGE:
-          processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
-          break;
-
-        case END_OF_STREAM:
-          EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
-          processFuture =
-              inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
-          break;
-
-        case WATERMARK:
-          WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
-          processFuture =
-              inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
-          break;
-
-        default:
-          processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
-          break;
-      }
-
-      processFuture.whenComplete((val, ex) -> {
-          if (ex != null) {
-            callback.failure(ex);
-          } else {
-            callback.complete();
+    Runnable processRunnable = () -> {
+      try {
+        SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
+        InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
+        if (inputOpImpl != null) {
+          CompletionStage<Void> processFuture;
+          MessageType messageType = MessageType.of(ime.getMessage());
+          switch (messageType) {
+            case USER_MESSAGE:
+              processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
+              break;
+
+            case END_OF_STREAM:
+              EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
+              processFuture =
+                  inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
+              break;
+
+            case WATERMARK:
+              WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
+              processFuture = inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector,
+                  coordinator);
+              break;
+
+            default:
+              processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
+              break;
           }
-        });
+
+          processFuture.whenComplete((val, ex) -> {
+              if (ex != null) {
+                callback.failure(ex);
+              } else {
+                callback.complete();
+              }
+            });
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to process the incoming message due to ", e);
+        callback.failure(e);
+      }
+    };
+
+    if (taskThreadPool != null) {
+      LOG.debug("Processing message using thread pool.");
+      taskThreadPool.submit(processRunnable);
+    } else {
+      LOG.debug("Processing message on the run loop thread.");
+      processRunnable.run();
     }
   }
 
@@ -153,6 +179,11 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
     }
   }
 
+  /* package private setter for TaskFactoryUtil to initialize the taskThreadPool */
+  void setTaskThreadPool(ExecutorService taskThreadPool) {
+    this.taskThreadPool = taskThreadPool;
+  }
+
   /* package private for testing */
   OperatorImplGraph getOperatorImplGraph() {
     return this.operatorImplGraph;
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java
new file mode 100644
index 0000000..0f9cc3b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+
+/**
+ * Build {@link StreamOperatorTask} instances.
+ * <p>
+ * Implementations should return a new instance of {@link StreamOperatorTask} for each {@link #createInstance()} invocation.
+ * Note: It is not part of samza-api as it is a temporary hack introduced for SAMZA-2172. It will eventually
+ * go away with SAMZA-2203
+ */
+interface StreamOperatorTaskFactory extends TaskFactory<AsyncStreamTask> {
+}
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 649483d..85549e4 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -48,7 +48,7 @@ public class TaskFactoryUtil {
     if (appDesc instanceof TaskApplicationDescriptorImpl) {
       return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory();
     } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
-      return (AsyncStreamTaskFactory) () -> new StreamOperatorTask(
+      return (StreamOperatorTaskFactory) () -> new StreamOperatorTask(
           ((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph());
     }
     throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
@@ -112,6 +112,24 @@ public class TaskFactoryUtil {
       return factory;
     }
 
+    boolean isStreamOperatorTaskClass = factory instanceof StreamOperatorTaskFactory;
+
+    /*
+     * Note: Even though StreamOperatorTask is an instanceof AsyncStreamTask, we still need to
+     * adapt it in order to inject the container thread pool. Long term, this will go away once we have the
+     * InternalTaskContext refactor, which would then become the entry point for exposing any of the runtime objects
+     * created in the container.
+     * Refer to SAMZA-2203 for more details.
+     */
+    if (isStreamOperatorTaskClass) {
+      log.info("Adapting StreamOperatorTaskFactory to inject container thread pool");
+      return (AsyncStreamTaskFactory) () -> {
+        StreamOperatorTask operatorTask = (StreamOperatorTask) factory.createInstance();
+        operatorTask.setTaskThreadPool(taskThreadPool);
+        return operatorTask;
+      };
+    }
+
     log.info("Converting StreamTask to AsyncStreamTaskAdapter");
     return (AsyncStreamTaskFactory) () ->
         new AsyncStreamTaskAdapter(((StreamTaskFactory) factory).createInstance(), taskThreadPool);
@@ -122,8 +140,12 @@ public class TaskFactoryUtil {
       throw new SamzaException("Either the task class name or the task factory instance is required.");
     }
 
-    if (!(factory instanceof StreamTaskFactory) && !(factory instanceof AsyncStreamTaskFactory)) {
-      throw new SamzaException(String.format("TaskFactory must be either StreamTaskFactory or AsyncStreamTaskFactory. %s is not supported",
+    boolean isValidFactory = factory instanceof StreamTaskFactory
+            || factory instanceof AsyncStreamTaskFactory
+            || factory instanceof StreamOperatorTaskFactory;
+
+    if (!isValidFactory) {
+      throw new SamzaException(String.format("TaskFactory must be either StreamTaskFactory or AsyncStreamTaskFactory or StreamOperatorTaskFactory. %s is not supported",
           factory.getClass()));
     }
   }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1b356f6..068d494 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -43,7 +43,6 @@ import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.context._
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
-import org.apache.samza.metadatastore.MetadataStoreFactory
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index ab5e295..330cab9 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -19,21 +19,22 @@
 
 package org.apache.samza.task;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.util.Clock;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class TestStreamOperatorTask {
-
   public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) {
     return task.getOperatorImplGraph();
   }
@@ -54,4 +55,29 @@ public class TestStreamOperatorTask {
     }
     operatorTask.close();
   }
+
+  /**
+   * Pass an invalid IME to processAsync. Any exceptions in processAsync should still get propagated through the
+   * task callback.
+   */
+  @Test
+  public void testExceptionsInProcessInvokesTaskCallback() throws InterruptedException {
+    ExecutorService taskThreadPool = Executors.newFixedThreadPool(2);
+    TaskCallback mockTaskCallback = mock(TaskCallback.class);
+    MessageCollector mockMessageCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+    StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class));
+    operatorTask.setTaskThreadPool(taskThreadPool);
+
+    CountDownLatch failureLatch = new CountDownLatch(1);
+
+    doAnswer(ctx -> {
+        failureLatch.countDown();
+        return null;
+      }).when(mockTaskCallback).failure(anyObject());
+
+    operatorTask.processAsync(mock(IncomingMessageEnvelope.class), mockMessageCollector,
+        mockTaskCoordinator, mockTaskCallback);
+    failureLatch.await();
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
index aa9a013..5db575d 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
@@ -31,9 +31,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 /**
@@ -95,6 +93,19 @@ public class TestTaskFactoryUtil {
     assertEquals(retFactory, mockAsyncStreamFactory);
   }
 
+  @Test
+  public void testFinalizeTaskFactoryForStreamOperatorTask() {
+    TaskFactory mockFactory = mock(StreamOperatorTaskFactory.class);
+    StreamOperatorTask mockStreamOperatorTask = mock(StreamOperatorTask.class);
+    when(mockFactory.createInstance())
+        .thenReturn(mockStreamOperatorTask);
+
+    ExecutorService mockThreadPool = mock(ExecutorService.class);
+    TaskFactory finalizedFactory = TaskFactoryUtil.finalizeTaskFactory(mockFactory, mockThreadPool);
+    finalizedFactory.createInstance();
+    verify(mockStreamOperatorTask, times(1)).setTaskThreadPool(eq(mockThreadPool));
+  }
+
   // test getTaskFactory with StreamApplicationDescriptor
   @Test
   public void testGetTaskFactoryWithStreamAppDescriptor() {
@@ -102,8 +113,8 @@ public class TestTaskFactoryUtil {
     OperatorSpecGraph mockSpecGraph = mock(OperatorSpecGraph.class);
     when(mockStreamApp.getOperatorSpecGraph()).thenReturn(mockSpecGraph);
     TaskFactory streamTaskFactory = TaskFactoryUtil.getTaskFactory(mockStreamApp);
-    assertTrue(streamTaskFactory instanceof AsyncStreamTaskFactory);
-    AsyncStreamTask streamTask = ((AsyncStreamTaskFactory) streamTaskFactory).createInstance();
+    assertTrue(streamTaskFactory instanceof StreamOperatorTaskFactory);
+    AsyncStreamTask streamTask = ((StreamOperatorTaskFactory) streamTaskFactory).createInstance();
     assertTrue(streamTask instanceof StreamOperatorTask);
     verify(mockSpecGraph).clone();
   }