You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/18 12:16:46 UTC

[3/15] Performance improvements - upgraded kryo to version 2 - removed intermediate "EventMessage" class - refactored serialized message handling by using ByteBuffer instead of byte arrays (this reduces copies and allows further optimizations) - as a res

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 75143c1..2bdcc5b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -23,6 +23,7 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.RemoteEmitterFactory;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
@@ -33,6 +34,7 @@ import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 /**
@@ -50,12 +52,14 @@ public class MockCommModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
         /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+                SerializerDeserializerFactory.class));
         bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
         bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
         bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+
         Assignment mockedAssignment = Mockito.mock(Assignment.class);
         Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
         bind(Assignment.class).toInstance(mockedAssignment);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index db45fcb..9e0bebc 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -25,9 +25,8 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.Assert;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.Main;
@@ -113,9 +112,12 @@ public class WordCountTest extends ZkBasedTest {
 
     public void injectSentence(String sentence) throws IOException {
         Event event = new Event();
+        event.setStreamId("inputStream");
         event.put("sentence", String.class, sentence);
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(getClass().getClassLoader()).serialize(event));
     }
 
 }