You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/09/24 16:47:15 UTC

git commit: Use a custom module for loading app classes - can load from either current class path, or from fetched S4R (using S4RLoader)

Updated Branches:
  refs/heads/S4-95 6fd20746c -> 3c450d4a3


Use a custom module for loading app classes
- can load from either current class path, or from fetched S4R (using S4RLoader)


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/3c450d4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/3c450d4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/3c450d4a

Branch: refs/heads/S4-95
Commit: 3c450d4a3b1656f95280ac85625e934548473afe
Parents: 6fd2074
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Sep 24 18:43:43 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Sep 24 18:43:43 2012 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/s4/core/AppModule.java    |   27 +++++++++++++++
 .../src/main/java/org/apache/s4/core/Main.java     |    9 +++++
 .../src/main/java/org/apache/s4/core/Receiver.java |    5 +--
 .../src/main/java/org/apache/s4/core/Sender.java   |    5 +--
 .../src/main/java/org/apache/s4/core/Server.java   |    9 +++--
 .../test/java/org/apache/s4/core/TriggerTest.java  |    3 +-
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    3 +-
 .../apache/s4/core/timers/MultithreadingTest.java  |    4 ++-
 .../apache/s4/core/windowing/WindowingPETest.java  |    4 ++-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   12 ++++--
 .../org/apache/s4/wordcount/WordCountTest.java     |    3 +-
 11 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
new file mode 100644
index 0000000..4d07afe
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+public class AppModule extends AbstractModule {
+
+    ClassLoader appClassLoader;
+
+    public AppModule(ClassLoader appClassLoader) {
+        this.appClassLoader = appClassLoader;
+    }
+
+    @Provides
+    public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+        return serDeserFactory.createSerializerDeserializer(appClassLoader);
+    }
+
+    @Override
+    protected void configure() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index b9ccef5..85899b5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -154,6 +154,15 @@ public class Main {
                 combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
             }
 
+            if (mainArgs.appClass != null) {
+                // In that case we won't be using an S4R classloader, app classes are available from the current
+                // classloader
+                // The app module provides bindings specific to the app class loader, in this case the current thread's
+                // class loader.
+                AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
+                combinedModule = Modules.override(combinedModule).with(appModule);
+            }
+
             injector = Guice.createInjector(combinedModule);
 
             if (mainArgs.appClass != null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 1cf8fe8..7a9b5f2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,9 +57,9 @@ public class Receiver implements Runnable {
     private Thread thread;
 
     @Inject
-    public Receiver(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+    public Receiver(Listener listener, SerializerDeserializer serDeser) {
         this.listener = listener;
-        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+        this.serDeser = serDeser;
 
         thread = new Thread(this, "Receiver");
         // TODO avoid starting the thread here

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 42cffa2..a308a15 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -24,7 +24,6 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.util.S4Metrics;
@@ -62,9 +61,9 @@ public class Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public Sender(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher, Assignment assignment) {
+    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
         this.emitter = emitter;
-        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+        this.serDeser = serDeser;
         this.hasher = hasher;
         this.assignment = assignment;
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 02e521a..c0c7b78 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -98,7 +98,7 @@ public class Server {
         logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
 
         S4RLoaderFactory loaderFactory = injector.getInstance(S4RLoaderFactory.class);
-        S4RLoader cl = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
+        S4RLoader appClassLoader = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
         try {
             JarFile s4rFile = new JarFile(s4r);
             if (s4rFile.getManifest() == null) {
@@ -115,9 +115,12 @@ public class Server {
             App app = null;
 
             try {
-                Object o = (cl.loadClass(appClassName)).newInstance();
+                Object o = (appClassLoader.loadClass(appClassName)).newInstance();
                 app = (App) o;
-                injector.injectMembers(app);
+                // we use the app module to provide bindings that depend upon a classloader savy of app classes, e.g.
+                // for serialization/deserialization
+                AppModule appModule = new AppModule(appClassLoader);
+                injector.createChildInjector(appModule).injectMembers(app);
             } catch (Exception e) {
                 logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
                 return null;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index f7236cf..ba3b484 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -64,7 +64,8 @@ public abstract class TriggerTest extends ZkBasedTest {
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+                .getClassLoader()));
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index b9543c8..e0bd874 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -31,6 +31,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
@@ -81,7 +82,7 @@ public class CheckpointingTest {
         CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
 
         Injector injector = Guice.createInjector(new MockCommModule(),
-                new MockCoreModuleWithFileBaseCheckpointingBackend());
+                new MockCoreModuleWithFileBaseCheckpointingBackend(), new AppModule(getClass().getClassLoader()));
         TestApp app = injector.getInstance(TestApp.class);
         app.init();
         app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
index a4b6827..f340e93 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 import org.apache.s4.fixtures.MockCommModule;
@@ -51,7 +52,8 @@ public class MultithreadingTest {
      */
     @Test
     public void testSynchronization() throws IOException, InterruptedException {
-        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+                .getClassLoader()));
         TestApp app = injector.getInstance(TestApp.class);
         app.count = 2; // One for the event, another for the timer
         app.init();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
index 5d06b28..a14f617 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
 import org.apache.s4.core.Stream;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
 import org.apache.s4.core.window.DefaultAggregatingSlot;
@@ -57,7 +58,8 @@ public class WindowingPETest {
         ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
                 .getLogger(Logger.ROOT_LOGGER_NAME);
         root.setLevel(Level.DEBUG);
-        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+                .getClassLoader()));
         TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
         app.init();
         app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 7e88dd8..79c3e2f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.AppModule;
 import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.Main;
 import org.gradle.tooling.BuildLauncher;
@@ -114,9 +115,12 @@ public class CoreTestUtils extends CommTestUtils {
     }
 
     public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
-        return Guice.createInjector(Modules.override(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
-                new NonFailFastZookeeperClientsModule()));
+        return Guice.createInjector(
+                Modules.override(
+                        new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(),
+                                "cluster1"),
+                        new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
+                        new NonFailFastZookeeperClientsModule()), new AppModule(Thread.currentThread()
+                        .getContextClassLoader()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 9e0bebc..f0a18f2 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -28,6 +28,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.AppModule;
 import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.Main;
 import org.apache.s4.fixtures.CommTestUtils;
@@ -68,7 +69,7 @@ public class WordCountTest extends ZkBasedTest {
     public void prepareEmitter() throws IOException {
         injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
                 .openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
-                .openStream()));
+                .openStream()), new AppModule(getClass().getClassLoader()));
 
         emitter = injector.getInstance(TCPEmitter.class);