You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/09/19 21:26:55 UTC
samza git commit: SAMZA-1416: Better logging around the exception
where class loading failed in initializing the SystemFactory for a
input/output system
Repository: samza
Updated Branches:
refs/heads/master e85b01dcb -> 0f06da1f8
SAMZA-1416: Better logging around the exception where class loading failed in initializing the SystemFactory for a input/output system
Also added test coverage for the Util.getObj method.
nickpan47 jmakes
Author: Daniel Nishimura <dn...@gmail.com>
Reviewers: Jacob Maes <jm...@linkedin.com>
Closes #296 from dnishimura/samza-1416
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0f06da1f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0f06da1f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0f06da1f
Branch: refs/heads/master
Commit: 0f06da1f80566f3090f9826b46fe46a3fadcd3f2
Parents: e85b01d
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Tue Sep 19 14:26:38 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Tue Sep 19 14:26:38 2017 -0700
----------------------------------------------------------------------
.../apache/samza/config/JavaSystemConfig.java | 14 ++--
.../org/apache/samza/config/SystemConfig.scala | 16 ++---
.../main/scala/org/apache/samza/util/Util.scala | 15 +++--
.../samza/config/TestJavaSystemConfig.java | 35 ++++++++--
.../apache/samza/config/TestSystemConfig.scala | 67 ++++++++++++++++++++
.../scala/org/apache/samza/util/TestUtil.scala | 13 ++++
6 files changed, 137 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
index 350f20c..57707aa 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
@@ -33,10 +34,10 @@ import org.apache.samza.util.Util;
* a java version of the system config
*/
public class JavaSystemConfig extends MapConfig {
- private static final String SYSTEM_PREFIX = "systems.";
- private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
- private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
- private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream.";
+ public static final String SYSTEM_PREFIX = "systems.";
+ public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+ public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
+ private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream.";
private static final String EMPTY = "";
public JavaSystemConfig(Config config) {
@@ -48,7 +49,8 @@ public class JavaSystemConfig extends MapConfig {
return null;
}
String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
- return get(systemFactory, null);
+ String value = get(systemFactory, null);
+ return (StringUtils.isBlank(value)) ? null : value;
}
/**
@@ -108,6 +110,6 @@ public class JavaSystemConfig extends MapConfig {
* @return a subset of the config with the system prefix removed.
*/
public Config getDefaultStreamProperties(String systemName) {
- return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
+ return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 804955c..91fb261 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -27,15 +27,17 @@ import org.apache.samza.util.Logging
*/
object SystemConfig {
// system config constants
- val SYSTEM_PREFIX = "systems.%s."
- val SYSTEM_FACTORY = "systems.%s.samza.factory"
+ val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s."
+ val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT
val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
implicit def Config2System(config: Config) = new SystemConfig(config)
}
class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
- def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name)
+ val javaSystemConfig = new JavaSystemConfig(config)
+
+ def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name))
def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE)
@@ -47,14 +49,10 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
* Returns a list of all system names from the config file. Useful for
* getting individual systems.
*/
- def getSystemNames() = {
- val subConf = config.subset("systems.", true)
- // find all .samza.factory keys, and strip the suffix
- subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
- }
+ def getSystemNames() = javaSystemConfig.getSystemNames().asScala
private def getSystemDefaultStreamProperty(name: String, property: String) = {
- val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name)
+ val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name)
val streamDefault = defaultStreamProperties.get(property)
if (!(streamDefault == null || streamDefault.isEmpty)) {
Option(streamDefault)
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 6c224e6..d639620 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -81,10 +81,17 @@ object Util extends Logging {
* Instantiate a class instance from a given className.
*/
def getObj[T](className: String) = {
- Class
- .forName(className)
- .newInstance
- .asInstanceOf[T]
+ try {
+ Class
+ .forName(className)
+ .newInstance
+ .asInstanceOf[T]
+ } catch {
+ case e: Throwable => {
+ error("Unable to instantiate a class instance for %s." format className, e)
+ throw e
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
index 9b39ec8..94ba374 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
@@ -27,15 +27,42 @@ import java.util.Map;
import org.junit.Test;
public class TestJavaSystemConfig {
+ private static final String MOCK_SYSTEM_NAME1 = "mocksystem1";
+ private static final String MOCK_SYSTEM_NAME2 = "mocksystem2";
+ private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1);
+ private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2);
+ private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1";
+ private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2";
+
+ @Test
+ public void testClassName() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+ JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
+
+ assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
+ }
+
+ @Test
+ public void testGetEmptyClassNameAsNull() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, "");
+ map.put(MOCK_SYSTEM_FACTORY_NAME2, " ");
+ JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
+
+ assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1));
+ assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2));
+ }
@Test
public void testGetSystemNames() {
Map<String, String> map = new HashMap<String, String>();
- map.put("systems.system1.samza.factory", "1");
- map.put("systems.system2.samza.factory", "2");
- JavaSystemConfig systemConfig = new JavaSystemConfig(
- new MapConfig(map));
+ map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1);
+ map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2);
+ JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map));
assertEquals(2, systemConfig.getSystemNames().size());
+ assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1));
+ assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
new file mode 100644
index 0000000..cc54d00
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConverters._
+import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY}
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSystemConfig {
+ val MOCK_SYSTEM_NAME1 = "mocksystem1"
+ val MOCK_SYSTEM_NAME2 = "mocksystem2"
+ val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1)
+ val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2)
+ val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"
+ val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"
+
+ def testClassName {
+ val configMap = Map[String, String](
+ MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1
+ )
+ val config = new MapConfig(configMap.asJava)
+
+ assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse(""))
+ }
+
+ @Test
+ def testGetEmptyClassNameAsNull {
+ val configMap = Map[String, String](
+ MOCK_SYSTEM_FACTORY_NAME1 -> "",
+ MOCK_SYSTEM_FACTORY_NAME1 -> " "
+ )
+ val config = new MapConfig(configMap.asJava)
+
+ assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None)
+ assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None)
+ }
+
+ def testGetSystemNames {
+ val configMap = Map[String, String](
+ MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1,
+ MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2
+ )
+ val config = new MapConfig(configMap.asJava)
+ val systemNames = config.getSystemNames()
+
+ assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1))
+ assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index da7c71d..1f7dc01 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -110,4 +110,17 @@ class TestUtil {
assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
}
+
+ @Test
+ def testGetObjExistingClass() {
+ val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig")
+ assertNotNull(obj)
+ assertEquals(classOf[MapConfig], obj.getClass())
+ }
+
+ @Test(expected = classOf[ClassNotFoundException])
+ def testGetObjNonexistentClass() {
+ Util.getObj("this.class.does.NotExist")
+ assert(false, "This should not get hit.")
+ }
}