You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:35 UTC
[04/50] incubator-apex-core git commit: APEX-29 #resolve Use
DefaultEventLoop.createEventLoop factory
APEX-29 #resolve Use DefaultEventLoop.createEventLoop factory
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8ae64ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8ae64ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8ae64ab6
Branch: refs/heads/master
Commit: 8ae64ab64337c8c259b80661bfe125b64e417c93
Parents: 66a75e0
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Aug 2 11:32:09 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 4 14:07:04 2015 -0700
----------------------------------------------------------------------
api/pom.xml | 2 +-
.../java/com/datatorrent/bufferserver/server/Server.java | 2 +-
.../main/java/com/datatorrent/bufferserver/util/System.java | 2 +-
.../com/datatorrent/bufferserver/client/SubscriberTest.java | 4 ++--
.../com/datatorrent/bufferserver/server/ServerTest.java | 4 ++--
.../datatorrent/bufferserver/storage/DiskStorageTest.java | 9 +++++----
.../com/datatorrent/stram/engine/StreamingContainer.java | 2 +-
.../java/com/datatorrent/stram/stream/FastPublisher.java | 2 ++
.../java/com/datatorrent/stram/stream/FastStreamTest.java | 6 ++++--
.../java/com/datatorrent/stram/stream/SocketStreamTest.java | 8 ++++----
10 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/api/pom.xml
----------------------------------------------------------------------
diff --git a/api/pom.xml b/api/pom.xml
index ff3f441..f04f622 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -74,7 +74,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netlet</artifactId>
- <version>1.1.0</version>
+ <version>1.2.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index a8adf08..7fb4823 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -144,7 +144,7 @@ public class Server implements ServerListener
port = 0;
}
- DefaultEventLoop eventloop = new DefaultEventLoop("alone");
+ DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone");
eventloop.start(null, port, new Server(port));
new Thread(eventloop).start();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index ff126cb..e9d6528 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -36,7 +36,7 @@ public class System
DefaultEventLoop el = eventloops.get(identifier);
if (el == null) {
try {
- eventloops.put(identifier, el = new DefaultEventLoop(identifier));
+ eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier));
}
catch (IOException io) {
throw new RuntimeException(io);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
index 3b6b57a..cde4f69 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
@@ -52,8 +52,8 @@ public class SubscriberTest
public static void setupServerAndClients() throws Exception
{
try {
- eventloopServer = new DefaultEventLoop("server");
- eventloopClient = new DefaultEventLoop("client");
+ eventloopServer = DefaultEventLoop.createEventLoop("server");
+ eventloopClient = DefaultEventLoop.createEventLoop("client");
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
index 600f18c..de26da8 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
@@ -58,8 +58,8 @@ public class ServerTest
public static void setupServerAndClients() throws Exception
{
try {
- eventloopServer = new DefaultEventLoop("server");
- eventloopClient = new DefaultEventLoop("client");
+ eventloopServer = DefaultEventLoop.createEventLoop("server");
+ eventloopClient = DefaultEventLoop.createEventLoop("client");
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
index 08dc5b8..dac996a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
@@ -16,12 +16,10 @@
package com.datatorrent.bufferserver.storage;
import java.net.InetSocketAddress;
-import static java.lang.Thread.sleep;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
import com.datatorrent.bufferserver.packet.EndWindowTuple;
@@ -32,6 +30,9 @@ import com.datatorrent.bufferserver.support.Publisher;
import com.datatorrent.bufferserver.support.Subscriber;
import com.datatorrent.netlet.DefaultEventLoop;
+import static java.lang.Thread.sleep;
+import static org.testng.Assert.assertEquals;
+
/**
*
*/
@@ -49,10 +50,10 @@ public class DiskStorageTest
@BeforeClass
public static void setupServerAndClients() throws Exception
{
- eventloopServer = new DefaultEventLoop("server");
+ eventloopServer = DefaultEventLoop.createEventLoop("server");
eventloopServer.start();
- eventloopClient = new DefaultEventLoop("client");
+ eventloopClient = DefaultEventLoop.createEventLoop("client");
eventloopClient.start();
instance = new Server(0, 1024,8);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 35861f1..9db88ee 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -125,7 +125,7 @@ public class StreamingContainer extends YarnContainerMain
static {
try {
- eventloop = new DefaultEventLoop("ProcessWideEventLoop");
+ eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
}
catch (IOException io) {
throw new RuntimeException(io);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 188fb7a..887c363 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -189,6 +189,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
if (!write) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
write = true;
+ key.selector().wakeup();
}
}
}
@@ -484,6 +485,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
if (!write) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
write = true;
+ key.selector().wakeup();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index e23358b..c7ed83c 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -16,7 +16,7 @@
package com.datatorrent.stram.stream;
import java.io.IOException;
-import static java.lang.Thread.sleep;
+
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +36,8 @@ import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
+import static java.lang.Thread.sleep;
+
/**
*
*/
@@ -52,7 +54,7 @@ public class FastStreamTest
static {
try {
- eventloop = new DefaultEventLoop("StreamTestEventLoop");
+ eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
}
catch (IOException ex) {
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 7702b85..2cdddc5 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -19,8 +19,6 @@ import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.support.StramTestSupport;
-import com.datatorrent.stram.stream.BufferServerPublisher;
-import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.api.Sink;
@@ -29,13 +27,15 @@ import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import java.io.IOException;
-import static java.lang.Thread.sleep;
+
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.Thread.sleep;
+
/**
*
*/
@@ -49,7 +49,7 @@ public class SocketStreamTest
static {
try {
- eventloop = new DefaultEventLoop("StreamTestEventLoop");
+ eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
}
catch (IOException ex) {
throw new RuntimeException(ex);