You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/02/14 17:29:42 UTC

[samza] branch SAMZA-2721 created (now ad596c8)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a change to branch SAMZA-2721
in repository https://gitbox.apache.org/repos/asf/samza.git.


      at ad596c8  SAMZA-2721: Container should exit with non-zero status code in case of errors during launch

This branch includes the following new commits:

     new ad596c8  SAMZA-2721: Container should exit with non-zero status code in case of errors during launch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[samza] 01/01: SAMZA-2721: Container should exit with non-zero status code in case of errors during launch

Posted by bh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch SAMZA-2721
in repository https://gitbox.apache.org/repos/asf/samza.git

commit ad596c87c1c8c07ca58b29e29974227b29444a29
Author: Bharath Kumarasubramanian <bh...@apache.org>
AuthorDate: Mon Feb 14 09:28:38 2022 -0800

    SAMZA-2721: Container should exit with non-zero status code in case of errors during launch
---
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 23 ++++++--
 .../samza/runtime/TestContainerLaunchUtil.java     | 68 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 7314a86..42d3c91 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -99,10 +100,11 @@ public class ContainerLaunchUtil {
     run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config,
         buildExternalContext(config));
 
-    System.exit(0);
+    exitProcess(0);
   }
 
-  private static void run(
+  @VisibleForTesting
+  static void run(
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       String jobName,
       String jobId,
@@ -112,7 +114,7 @@ public class ContainerLaunchUtil {
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
-    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap());
+    CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
     coordinatorStreamStore.init();
 
     try {
@@ -179,15 +181,26 @@ public class ContainerLaunchUtil {
 
       if (containerRunnerException != null) {
         log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
-        System.exit(1);
+        exitProcess(1);
       }
     } catch (Throwable e) {
-      log.error("Container stopped with Exception. ", containerRunnerException);
+      log.error("Exiting the process due to {}. \nContainer runner exception: {}", e, containerRunnerException);
+      exitProcess(1);
     } finally {
       coordinatorStreamStore.close();
     }
   }
 
+  @VisibleForTesting
+  static CoordinatorStreamStore buildCoordinatorStreamStore(Config config, MetricsRegistryMap metricsRegistryMap) {
+    return new CoordinatorStreamStore(config, metricsRegistryMap);
+  }
+
+  @VisibleForTesting
+  static void exitProcess(int status) {
+    System.exit(status);
+  }
+
   private static Optional<ExternalContext> buildExternalContext(Config config) {
     /*
      * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
new file mode 100644
index 0000000..ec57991
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.job.model.JobModel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ContainerLaunchUtil.class)
+public class TestContainerLaunchUtil {
+  private static final String JOB_NAME = "test-job";
+  private static final String JOB_ID = "test-job-i001";
+  private static final String CONTAINER_ID = "test-job-container-0001";
+
+  private static final ApplicationDescriptorImpl APP_DESC = mock(ApplicationDescriptorImpl.class);
+  private static final JobModel JOB_MODEL = mock(JobModel.class);
+  private static final Config CONFIG = mock(Config.class);
+
+  @Test
+  public void testRunWithException() throws Exception {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    PowerMockito.mockStatic(ContainerLaunchUtil.class);
+    PowerMockito.doReturn(mock(CoordinatorStreamStore.class))
+        .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any());
+    PowerMockito.doAnswer(invocation -> {
+      completionLatch.countDown();
+      return null;
+    }).when(ContainerLaunchUtil.class, "exitProcess", eq(1));
+    PowerMockito.doCallRealMethod()
+        .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(),
+            eq(JOB_MODEL), eq(CONFIG), any());
+
+    ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL,
+        CONFIG, Optional.empty());
+    assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
+  }
+}