You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/09/19 21:26:55 UTC

samza git commit: SAMZA-1416: Better logging around the exception where class loading failed in initializing the SystemFactory for a input/output system

Repository: samza
Updated Branches:
  refs/heads/master e85b01dcb -> 0f06da1f8


SAMZA-1416: Better logging around the exception where class loading failed in initializing the SystemFactory for a input/output system

Also added test coverage for the Util.getObj method.
nickpan47 jmakes

Author: Daniel Nishimura <dn...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #296 from dnishimura/samza-1416


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0f06da1f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0f06da1f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0f06da1f

Branch: refs/heads/master
Commit: 0f06da1f80566f3090f9826b46fe46a3fadcd3f2
Parents: e85b01d
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Tue Sep 19 14:26:38 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Tue Sep 19 14:26:38 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/config/JavaSystemConfig.java   | 14 ++--
 .../org/apache/samza/config/SystemConfig.scala  | 16 ++---
 .../main/scala/org/apache/samza/util/Util.scala | 15 +++--
 .../samza/config/TestJavaSystemConfig.java      | 35 ++++++++--
 .../apache/samza/config/TestSystemConfig.scala  | 67 ++++++++++++++++++++
 .../scala/org/apache/samza/util/TestUtil.scala  | 13 ++++
 6 files changed, 137 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
index 350f20c..57707aa 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -33,10 +34,10 @@ import org.apache.samza.util.Util;
  * a java version of the system config
  */
 public class JavaSystemConfig extends MapConfig {
-  private static final String SYSTEM_PREFIX = "systems.";
-  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
-  private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
-  private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream.";
+  public static final String SYSTEM_PREFIX = "systems.";
+  public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+  public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
+  private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream.";
   private static final String EMPTY = "";
 
   public JavaSystemConfig(Config config) {
@@ -48,7 +49,8 @@ public class JavaSystemConfig extends MapConfig {
       return null;
     }
     String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
-    return get(systemFactory, null);
+    String value = get(systemFactory, null);
+    return (StringUtils.isBlank(value)) ? null : value;
   }
 
   /**
@@ -108,6 +110,6 @@ public class JavaSystemConfig extends MapConfig {
    * @return a subset of the config with the system prefix removed.
    */
   public Config getDefaultStreamProperties(String systemName) {
-    return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
+    return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 804955c..91fb261 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -27,15 +27,17 @@ import org.apache.samza.util.Logging
   */
 object SystemConfig {
   // system config constants
-  val SYSTEM_PREFIX = "systems.%s."
-  val SYSTEM_FACTORY = "systems.%s.samza.factory"
+  val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s."
+  val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
   val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
 
   implicit def Config2System(config: Config) = new SystemConfig(config)
 }
 
 class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
+  val javaSystemConfig = new JavaSystemConfig(config)
+
+  def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name))
 
   def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
 
@@ -47,14 +49,10 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
    * Returns a list of all system names from the config file. Useful for
    * getting individual systems.
    */
-  def getSystemNames() = {
-    val subConf = config.subset("systems.", true)
-    // find all .samza.factory keys, and strip the suffix
-    subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
-  }
+  def getSystemNames() = javaSystemConfig.getSystemNames().asScala
 
   private def getSystemDefaultStreamProperty(name: String, property: String) = {
-    val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name)
+    val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name)
     val streamDefault = defaultStreamProperties.get(property)
     if (!(streamDefault == null || streamDefault.isEmpty)) {
       Option(streamDefault)

http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 6c224e6..d639620 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -81,10 +81,17 @@ object Util extends Logging {
    * Instantiate a class instance from a given className.
    */
   def getObj[T](className: String) = {
-    Class
-      .forName(className)
-      .newInstance
-      .asInstanceOf[T]
+    try {
+      Class
+        .forName(className)
+        .newInstance
+        .asInstanceOf[T]
+    } catch {
+      case e: Throwable => {
+        error("Unable to instantiate a class instance for %s." format className, e)
+        throw e
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
index 9b39ec8..94ba374 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
@@ -27,15 +27,42 @@ import java.util.Map;
 import org.junit.Test;
 
 public class TestJavaSystemConfig {
+  private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
+  private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
+  private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
+  private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
+  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
+  private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
+
+  @Test
+  public void testClassName() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
+
+    assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
+  }
+
+  @Test
+  public void testGetEmptyClassNameAsNull() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
+    map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
+    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
+
+    assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
+    assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2));
+  }
 
   @Test
   public void testGetSystemNames() {
     Map<String, String> map = new HashMap<String, String>();
-    map.put("systems.system1.samza.factory", "1");
-    map.put("systems.system2.samza.factory", "2");
-    JavaSystemConfig systemConfig = new JavaSystemConfig(
-        new MapConfig(map));
+    map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+    map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
+    JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
 
     assertEquals(2, systemConfig.getSystemNames().size());
+    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
+    assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
new file mode 100644
index 0000000..cc54d00
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConverters._
+import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY}
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSystemConfig {
+  val MOCK_SYSTEM_NAME1 = "mocksystem1"
+  val MOCK_SYSTEM_NAME2 = "mocksystem2"
+  val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1)
+  val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2)
+  val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"
+  val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"
+
+  def testClassName {
+    val configMap = Map[String, String](
+      MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1
+    )
+    val config = new MapConfig(configMap.asJava)
+
+    assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse(""))
+  }
+
+  @Test
+  def testGetEmptyClassNameAsNull {
+    val configMap = Map[String, String](
+      MOCK_SYSTEM_FACTORY_NAME1 -> "",
+      MOCK_SYSTEM_FACTORY_NAME1 -> " "
+    )
+    val config = new MapConfig(configMap.asJava)
+
+    assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None)
+    assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None)
+  }
+
+  def testGetSystemNames {
+    val configMap = Map[String, String](
+      MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1,
+      MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2
+    )
+    val config = new MapConfig(configMap.asJava)
+    val systemNames = config.getSystemNames()
+
+    assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1))
+    assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index da7c71d..1f7dc01 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -110,4 +110,17 @@ class TestUtil {
     assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
     assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
   }
+
+  @Test
+  def testGetObjExistingClass() {
+    val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig")
+    assertNotNull(obj)
+    assertEquals(classOf[MapConfig], obj.getClass())
+  }
+
+  @Test(expected = classOf[ClassNotFoundException])
+  def testGetObjNonexistentClass() {
+    Util.getObj("this.class.does.NotExist")
+    assert(false, "This should not get hit.")
+  }
 }