You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/09/24 16:47:15 UTC
git commit: Use a custom module for loading app classes - can load
from either current class path, or from fetched S4R (using S4RLoader)
Updated Branches:
refs/heads/S4-95 6fd20746c -> 3c450d4a3
Use a custom module for loading app classes
- can load from either current class path, or from fetched S4R (using S4RLoader)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/3c450d4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/3c450d4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/3c450d4a
Branch: refs/heads/S4-95
Commit: 3c450d4a3b1656f95280ac85625e934548473afe
Parents: 6fd2074
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Sep 24 18:43:43 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Sep 24 18:43:43 2012 +0200
----------------------------------------------------------------------
.../main/java/org/apache/s4/core/AppModule.java | 27 +++++++++++++++
.../src/main/java/org/apache/s4/core/Main.java | 9 +++++
.../src/main/java/org/apache/s4/core/Receiver.java | 5 +--
.../src/main/java/org/apache/s4/core/Sender.java | 5 +--
.../src/main/java/org/apache/s4/core/Server.java | 9 +++--
.../test/java/org/apache/s4/core/TriggerTest.java | 3 +-
.../org/apache/s4/core/ft/CheckpointingTest.java | 3 +-
.../apache/s4/core/timers/MultithreadingTest.java | 4 ++-
.../apache/s4/core/windowing/WindowingPETest.java | 4 ++-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 12 ++++--
.../org/apache/s4/wordcount/WordCountTest.java | 3 +-
11 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
new file mode 100644
index 0000000..4d07afe
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+public class AppModule extends AbstractModule {
+
+ ClassLoader appClassLoader;
+
+ public AppModule(ClassLoader appClassLoader) {
+ this.appClassLoader = appClassLoader;
+ }
+
+ @Provides
+ public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+ return serDeserFactory.createSerializerDeserializer(appClassLoader);
+ }
+
+ @Override
+ protected void configure() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index b9ccef5..85899b5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -154,6 +154,15 @@ public class Main {
combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
}
+ if (mainArgs.appClass != null) {
+ // In that case we won't be using an S4R classloader, app classes are available from the current
+ // classloader
+ // The app module provides bindings specific to the app class loader, in this case the current thread's
+ // class loader.
+ AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
+ combinedModule = Modules.override(combinedModule).with(appModule);
+ }
+
injector = Guice.createInjector(combinedModule);
if (mainArgs.appClass != null) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 1cf8fe8..7a9b5f2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.s4.base.Event;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.core.util.S4Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,9 +57,9 @@ public class Receiver implements Runnable {
private Thread thread;
@Inject
- public Receiver(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+ public Receiver(Listener listener, SerializerDeserializer serDeser) {
this.listener = listener;
- this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+ this.serDeser = serDeser;
thread = new Thread(this, "Receiver");
// TODO avoid starting the thread here
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 42cffa2..a308a15 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -24,7 +24,6 @@ import org.apache.s4.base.Emitter;
import org.apache.s4.base.Event;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.util.S4Metrics;
@@ -62,9 +61,9 @@ public class Sender {
* a hashing function to map keys to partition IDs.
*/
@Inject
- public Sender(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher, Assignment assignment) {
+ public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
this.emitter = emitter;
- this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+ this.serDeser = serDeser;
this.hasher = hasher;
this.assignment = assignment;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 02e521a..c0c7b78 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -98,7 +98,7 @@ public class Server {
logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
S4RLoaderFactory loaderFactory = injector.getInstance(S4RLoaderFactory.class);
- S4RLoader cl = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
+ S4RLoader appClassLoader = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
try {
JarFile s4rFile = new JarFile(s4r);
if (s4rFile.getManifest() == null) {
@@ -115,9 +115,12 @@ public class Server {
App app = null;
try {
- Object o = (cl.loadClass(appClassName)).newInstance();
+ Object o = (appClassLoader.loadClass(appClassName)).newInstance();
app = (App) o;
- injector.injectMembers(app);
+ // we use the app module to provide bindings that depend upon a classloader savy of app classes, e.g.
+ // for serialization/deserialization
+ AppModule appModule = new AppModule(appClassLoader);
+ injector.createChildInjector(appModule).injectMembers(app);
} catch (Exception e) {
logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
return null;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index f7236cf..ba3b484 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -64,7 +64,8 @@ public abstract class TriggerTest extends ZkBasedTest {
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+ .getClassLoader()));
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index b9543c8..e0bd874 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -31,6 +31,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.s4.base.Event;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
@@ -81,7 +82,7 @@ public class CheckpointingTest {
CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
Injector injector = Guice.createInjector(new MockCommModule(),
- new MockCoreModuleWithFileBaseCheckpointingBackend());
+ new MockCoreModuleWithFileBaseCheckpointingBackend(), new AppModule(getClass().getClassLoader()));
TestApp app = injector.getInstance(TestApp.class);
app.init();
app.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
index a4b6827..f340e93 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
import org.apache.s4.fixtures.MockCommModule;
@@ -51,7 +52,8 @@ public class MultithreadingTest {
*/
@Test
public void testSynchronization() throws IOException, InterruptedException {
- Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+ .getClassLoader()));
TestApp app = injector.getInstance(TestApp.class);
app.count = 2; // One for the event, another for the timer
app.init();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
index 5d06b28..a14f617 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
+import org.apache.s4.core.AppModule;
import org.apache.s4.core.Stream;
import org.apache.s4.core.window.AbstractSlidingWindowPE;
import org.apache.s4.core.window.DefaultAggregatingSlot;
@@ -57,7 +58,8 @@ public class WindowingPETest {
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.DEBUG);
- Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule(), new AppModule(getClass()
+ .getClassLoader()));
TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
app.init();
app.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 7e88dd8..79c3e2f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.AppModule;
import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.Main;
import org.gradle.tooling.BuildLauncher;
@@ -114,9 +115,12 @@ public class CoreTestUtils extends CommTestUtils {
}
public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
- return Guice.createInjector(Modules.override(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
- new NonFailFastZookeeperClientsModule()));
+ return Guice.createInjector(
+ Modules.override(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(),
+ "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
+ new NonFailFastZookeeperClientsModule()), new AppModule(Thread.currentThread()
+ .getContextClassLoader()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/3c450d4a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 9e0bebc..f0a18f2 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -28,6 +28,7 @@ import org.apache.s4.base.Event;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.AppModule;
import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.Main;
import org.apache.s4.fixtures.CommTestUtils;
@@ -68,7 +69,7 @@ public class WordCountTest extends ZkBasedTest {
public void prepareEmitter() throws IOException {
injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
.openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
- .openStream()));
+ .openStream()), new AppModule(getClass().getClassLoader()));
emitter = injector.getInstance(TCPEmitter.class);