You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/18 12:16:46 UTC
[3/15] Performance improvements - upgraded kryo to version 2 -
removed intermediate "EventMessage" class - refactored serialized message
handling by using ByteBuffer instead of byte arrays (this reduces copies and
allows further optimizations) - as a res
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 75143c1..2bdcc5b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -23,6 +23,7 @@ import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.RemoteEmitterFactory;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.ClusterNode;
@@ -33,6 +34,7 @@ import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
/**
@@ -50,12 +52,14 @@ public class MockCommModule extends AbstractModule {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
/* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+ SerializerDeserializerFactory.class));
bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+
Assignment mockedAssignment = Mockito.mock(Assignment.class);
Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
bind(Assignment.class).toInstance(mockedAssignment);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index db45fcb..9e0bebc 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -25,9 +25,8 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.Main;
@@ -113,9 +112,12 @@ public class WordCountTest extends ZkBasedTest {
public void injectSentence(String sentence) throws IOException {
Event event = new Event();
+ event.setStreamId("inputStream");
event.put("sentence", String.class, sentence);
- emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
- .serialize(event)));
+ emitter.send(
+ 0,
+ injector.getInstance(SerializerDeserializerFactory.class)
+ .createSerializerDeserializer(getClass().getClassLoader()).serialize(event));
}
}