You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:35 UTC

[04/50] incubator-apex-core git commit: APEX-29 #resolve Use DefaultEventLoop.createEventLoop factory

APEX-29 #resolve Use DefaultEventLoop.createEventLoop factory


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8ae64ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8ae64ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8ae64ab6

Branch: refs/heads/master
Commit: 8ae64ab64337c8c259b80661bfe125b64e417c93
Parents: 66a75e0
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Aug 2 11:32:09 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 4 14:07:04 2015 -0700

----------------------------------------------------------------------
 api/pom.xml                                                 | 2 +-
 .../java/com/datatorrent/bufferserver/server/Server.java    | 2 +-
 .../main/java/com/datatorrent/bufferserver/util/System.java | 2 +-
 .../com/datatorrent/bufferserver/client/SubscriberTest.java | 4 ++--
 .../com/datatorrent/bufferserver/server/ServerTest.java     | 4 ++--
 .../datatorrent/bufferserver/storage/DiskStorageTest.java   | 9 +++++----
 .../com/datatorrent/stram/engine/StreamingContainer.java    | 2 +-
 .../java/com/datatorrent/stram/stream/FastPublisher.java    | 2 ++
 .../java/com/datatorrent/stram/stream/FastStreamTest.java   | 6 ++++--
 .../java/com/datatorrent/stram/stream/SocketStreamTest.java | 8 ++++----
 10 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/api/pom.xml
----------------------------------------------------------------------
diff --git a/api/pom.xml b/api/pom.xml
index ff3f441..f04f622 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -74,7 +74,7 @@
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>netlet</artifactId>
-      <version>1.1.0</version>
+      <version>1.2.0-SNAPSHOT</version>
     </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index a8adf08..7fb4823 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -144,7 +144,7 @@ public class Server implements ServerListener
       port = 0;
     }
 
-    DefaultEventLoop eventloop = new DefaultEventLoop("alone");
+    DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone");
     eventloop.start(null, port, new Server(port));
     new Thread(eventloop).start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
index ff126cb..e9d6528 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java
@@ -36,7 +36,7 @@ public class System
       DefaultEventLoop el = eventloops.get(identifier);
       if (el == null) {
         try {
-          eventloops.put(identifier, el = new DefaultEventLoop(identifier));
+          eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier));
         }
         catch (IOException io) {
           throw new RuntimeException(io);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
index 3b6b57a..cde4f69 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
@@ -52,8 +52,8 @@ public class SubscriberTest
   public static void setupServerAndClients() throws Exception
   {
     try {
-      eventloopServer = new DefaultEventLoop("server");
-      eventloopClient = new DefaultEventLoop("client");
+      eventloopServer = DefaultEventLoop.createEventLoop("server");
+      eventloopClient = DefaultEventLoop.createEventLoop("client");
     }
     catch (IOException ioe) {
       throw new RuntimeException(ioe);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
index 600f18c..de26da8 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
@@ -58,8 +58,8 @@ public class ServerTest
   public static void setupServerAndClients() throws Exception
   {
     try {
-      eventloopServer = new DefaultEventLoop("server");
-      eventloopClient = new DefaultEventLoop("client");
+      eventloopServer = DefaultEventLoop.createEventLoop("server");
+      eventloopClient = DefaultEventLoop.createEventLoop("client");
     }
     catch (IOException ioe) {
       throw new RuntimeException(ioe);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
index 08dc5b8..dac996a 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
@@ -16,12 +16,10 @@
 package com.datatorrent.bufferserver.storage;
 
 import java.net.InetSocketAddress;
-import static java.lang.Thread.sleep;
 
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
 
 import com.datatorrent.bufferserver.packet.BeginWindowTuple;
 import com.datatorrent.bufferserver.packet.EndWindowTuple;
@@ -32,6 +30,9 @@ import com.datatorrent.bufferserver.support.Publisher;
 import com.datatorrent.bufferserver.support.Subscriber;
 import com.datatorrent.netlet.DefaultEventLoop;
 
+import static java.lang.Thread.sleep;
+import static org.testng.Assert.assertEquals;
+
 /**
  *
  */
@@ -49,10 +50,10 @@ public class DiskStorageTest
   @BeforeClass
   public static void setupServerAndClients() throws Exception
   {
-    eventloopServer = new DefaultEventLoop("server");
+    eventloopServer = DefaultEventLoop.createEventLoop("server");
     eventloopServer.start();
 
-    eventloopClient = new DefaultEventLoop("client");
+    eventloopClient = DefaultEventLoop.createEventLoop("client");
     eventloopClient.start();
 
     instance = new Server(0, 1024,8);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 35861f1..9db88ee 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -125,7 +125,7 @@ public class StreamingContainer extends YarnContainerMain
 
   static {
     try {
-      eventloop = new DefaultEventLoop("ProcessWideEventLoop");
+      eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
     }
     catch (IOException io) {
       throw new RuntimeException(io);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 188fb7a..887c363 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -189,6 +189,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
       if (!write) {
         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
         write = true;
+        key.selector().wakeup();
       }
     }
   }
@@ -484,6 +485,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
     if (!write) {
       key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
       write = true;
+      key.selector().wakeup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index e23358b..c7ed83c 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -16,7 +16,7 @@
 package com.datatorrent.stram.stream;
 
 import java.io.IOException;
-import static java.lang.Thread.sleep;
+
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -36,6 +36,8 @@ import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
+import static java.lang.Thread.sleep;
+
 /**
  *
  */
@@ -52,7 +54,7 @@ public class FastStreamTest
 
   static {
     try {
-      eventloop = new DefaultEventLoop("StreamTestEventLoop");
+      eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
     }
     catch (IOException ex) {
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 7702b85..2cdddc5 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -19,8 +19,6 @@ import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.engine.SweepableReservoir;
 import com.datatorrent.stram.support.StramTestSupport;
-import com.datatorrent.stram.stream.BufferServerPublisher;
-import com.datatorrent.stram.stream.BufferServerSubscriber;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 import com.datatorrent.api.Sink;
@@ -29,13 +27,15 @@ import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
 import java.io.IOException;
-import static java.lang.Thread.sleep;
+
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.Thread.sleep;
+
 /**
  *
  */
@@ -49,7 +49,7 @@ public class SocketStreamTest
 
   static {
     try {
-      eventloop = new DefaultEventLoop("StreamTestEventLoop");
+      eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
     }
     catch (IOException ex) {
       throw new RuntimeException(ex);