You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:35 UTC

[02/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
new file mode 100644
index 0000000..5e79d39
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Stage that executes an event handler
+ *
+ * @param <T> type
+ */
+public interface EStage<T> extends EventHandler<T>, Stage {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
new file mode 100644
index 0000000..871e64e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Handler to process an event
+ *
+ * @param <T> type
+ */
+public interface EventHandler<T> {
+
+  /**
+   * Handles an event
+   *
+   * @param value an event
+   */
+  public void onNext(T value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
new file mode 100644
index 0000000..784dafb
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+public interface Identifiable {
+
+  /**
+   * Returns an identifier of this object
+   *
+   * @return an identifier of this object
+   */
+  public Identifier getId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
new file mode 100644
index 0000000..377b23e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/*
+ * An identifier class for REEF.  Identifiers are a generic naming primitive
+ * that carry some information about the type of the object they point to.
+ * Typical examples are server sockets, filenames, and requests.
+ * 
+ * Identifier constructors should take zero arguments, or take a single string.
+ * 
+ */
+public interface Identifier {
+
+  /**
+   * Returns a hash code for the object
+   *
+   * @return a hash code value for this object
+   */
+  public int hashCode();
+
+  /**
+   * Checks that another object is equal to this object
+   *
+   * @param o another object
+   * @return true if the object is the same as the object argument; false,
+   * otherwise
+   */
+  public boolean equals(Object o);
+
+  /**
+   * Return a string representation of this object. This method should return a
+   * URL-style string, that begins with "type://", where "type" is chosen to
+   * uniquely identify this type of identifier.
+   */
+  public String toString();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
new file mode 100644
index 0000000..828eda9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+public interface IdentifierFactory {
+
+  /**
+   * Creates a RemoteIdentifier object
+   *
+   * @param str a string
+   * @return an identifier
+   */
+  public Identifier getNewInstance(String str);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
new file mode 100644
index 0000000..a2e7df9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.wake.impl.DefaultIdentifierFactory;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+import org.apache.reef.wake.storage.FileIdentifier;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class IdentifierParser implements ExternalConstructor<Identifier> {
+  private static final IdentifierFactory factory;
+
+  // TODO: Modify tang to allow this to use a factory pattern.
+  static {
+    Map<String, Class<? extends Identifier>> map = new ConcurrentHashMap<>();
+
+    map.put("socket", SocketRemoteIdentifier.class);
+    map.put("file", FileIdentifier.class);
+
+    factory = new DefaultIdentifierFactory(map);
+  }
+
+  final Identifier id;
+
+  @Inject
+  IdentifierParser(String s) {
+    id = factory.getNewInstance(s);
+  }
+
+  @Override
+  public Identifier newInstance() {
+    return id;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
new file mode 100644
index 0000000..df79d0f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Stage is an execution unit for events and provides a way to reclaim its resources
+ */
+public interface Stage extends AutoCloseable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
new file mode 100644
index 0000000..0ccbb3a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.rx.Observer;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Configuration options for Wake Stage
+ */
+public final class StageConfiguration {
+
+  @NamedParameter(doc = "The stage name.")
+  public static final class StageName implements Name<String> {
+  }
+
+  @NamedParameter(doc = "The event handler for the stage.")
+  public static final class StageHandler implements Name<EventHandler<?>> {
+  }
+
+  @NamedParameter(doc = "The error handler for the stage.")
+  public static final class ErrorHandler implements Name<EventHandler<Throwable>> {
+  }
+
+  @NamedParameter(doc = "The number of threads for the stage.")
+  public static final class NumberOfThreads implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "The capacity for the stage.")
+  public static final class Capacity implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "The executor service for the stage.")
+  public static final class StageExecutorService implements Name<ExecutorService> {
+  }
+
+  @NamedParameter(doc = "The initial delay for periodic events of the timer stage.")
+  public static final class TimerInitialDelay implements Name<Long> {
+  }
+
+  @NamedParameter(doc = "The period for periodic events of the timer stage.")
+  public static final class TimerPeriod implements Name<Long> {
+  }
+
+  @NamedParameter(doc = "The observer for the stage.")
+  public static final class StageObserver implements Name<Observer<?>> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
new file mode 100644
index 0000000..7adee0b
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationFile;
+import org.apache.reef.wake.exception.WakeRuntimeException;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Wake parameter configuration
+ */
+public final class WakeConfiguration {
+  private final static Logger LOG = Logger.getLogger(WakeConfiguration.class.getName());
+
+  @Inject
+  public WakeConfiguration(final @Parameter(FileName.class) String confFileName) {
+    if (confFileName.equals("")) {
+      LOG.log(Level.WARNING, "The Wake configuration file is not specified.");
+    } else {
+      Tang t = Tang.Factory.getTang();
+      JavaConfigurationBuilder cb = t.newConfigurationBuilder();
+      try {
+        ConfigurationFile.addConfiguration(cb, new File(confFileName));
+      } catch (BindException e) {
+        throw new WakeRuntimeException(e);
+      } catch (IOException e) {
+        throw new WakeRuntimeException(e);
+      }
+    }
+  }
+
+  @NamedParameter(doc = "Configuration file name", default_value = "")
+  public final static class FileName implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
new file mode 100644
index 0000000..897cc08
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/*
+ * Default parameters for Wake
+ */
+public final class WakeParameters {
+
+  public final static int MAX_FRAME_LENGTH = 1 * 1024 * 1024;
+
+  public final static long EXECUTOR_SHUTDOWN_TIMEOUT = 1000;
+
+  public final static long REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT = 10000;
+
+  @NamedParameter(doc = "Maximum frame length unit", default_value = "" + MAX_FRAME_LENGTH)
+  public final static class MaxFrameLength implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "Executor shutdown timeout", default_value = "" + EXECUTOR_SHUTDOWN_TIMEOUT)
+  public final static class ExecutorShutdownTimeout implements Name<Integer> {
+  }
+
+  @NamedParameter(doc = "Remote send timeout", default_value = "" + REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT)
+  public final static class RemoteSendTimeout implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
new file mode 100644
index 0000000..63b4d30
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.accumulate;
+
+
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.rx.Observer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class CombinerStage<K extends Comparable<K>, V> implements Stage {
+
+  private final Combiner<K, V> c;
+  private final Observer<Map.Entry<K, V>> o;
+  private final OutputThread worker = new OutputThread();
+  private final ConcurrentSkipListMap<K, V> register = new ConcurrentSkipListMap<>();
+  private volatile boolean done = false;
+
+  public CombinerStage(Combiner<K, V> c, Observer<Map.Entry<K, V>> o) {
+    this.c = c;
+    this.o = o;
+    worker.start();
+  }
+
+  public Observer<Map.Entry<K, V>> wireIn() {
+    return new Observer<Map.Entry<K, V>>() {
+      @Override
+      public void onNext(Map.Entry<K, V> pair) {
+        V old;
+        V newVal;
+        boolean wasEmpty = register.isEmpty();
+        boolean succ = false;
+
+        while (!succ) {
+          old = register.get(pair.getKey());
+          newVal = c.combine(pair.getKey(), old, pair.getValue());
+          if (old == null) {
+            succ = (null == register.putIfAbsent(pair.getKey(), newVal));
+          } else {
+            succ = register.replace(pair.getKey(), old, newVal);
+          }
+        }
+
+        if (wasEmpty) {
+          synchronized (register) {
+            register.notify();
+          }
+        }
+      }
+
+      @Override
+      public void onError(Exception error) {
+        o.onError(error);
+      }
+
+      @Override
+      public void onCompleted() {
+        synchronized (register) {
+          done = true;
+          if (register.isEmpty()) {
+            register.notify();
+          }
+        }
+      }
+    };
+  }
+
+  @Override
+  public void close() throws Exception {
+    worker.join();
+  }
+
+  public interface Combiner<K extends Comparable<K>, V> {
+    V combine(K key, V old, V cur);
+  }
+
+  public static class Pair<K extends Comparable<K>, V> implements Map.Entry<K, V>, Comparable<Map.Entry<K, V>> {
+    private final K k;
+    private final V v;
+
+    public Pair(K k, V v) {
+      this.k = k;
+      this.v = v;
+    }
+
+    @Override
+    public int compareTo(Map.Entry<K, V> arg0) {
+      return k.compareTo(arg0.getKey());
+    }
+
+    @Override
+    public K getKey() {
+      return k;
+    }
+
+    @Override
+    public V getValue() {
+      return v;
+    }
+
+    @Override
+    public V setValue(V value) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private class OutputThread extends Thread {
+    public OutputThread() {
+      super("grouper-output-thread");
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        if (register.isEmpty()) {
+          synchronized (register) {
+            while (register.isEmpty() && !done) {
+              try {
+                register.wait();
+              } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+              }
+            }
+            if (done) {
+              break;
+            }
+          }
+        }
+        Map.Entry<K, V> cursor = register.pollFirstEntry();
+        while (cursor != null) {
+          o.onNext(cursor);
+          K nextKey = register.higherKey(cursor.getKey());
+
+          /* If there is more than one OutputThread worker then the remove() -> null case
+           * must be handled
+           */
+          cursor = (nextKey == null) ? null : new Pair<>(nextKey, register.remove(nextKey));
+        }
+      }
+      o.onCompleted();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
new file mode 100644
index 0000000..a5f0ba6
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+
+
+public class BlockingJoin implements StaticObservable {
+  private final Observer<TupleEvent> out;
+  private final ConcurrentSkipListSet<TupleEvent> left = new ConcurrentSkipListSet<>();
+  boolean leftDone = false;
+
+  public BlockingJoin(Observer<TupleEvent> out) {
+    this.out = out;
+  }
+
+  private synchronized void tellEveryoneLeftIsDone() {
+    leftDone = true;
+    notifyAll();
+  }
+
+  private synchronized void waitUntilLeftIsDone() {
+    while (!leftDone) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "No support for interrupted threads here!", e);
+      }
+    }
+  }
+
+  public Observer<TupleEvent> wireLeft() {
+    return new Observer<TupleEvent>() {
+
+      @Override
+      public void onNext(TupleEvent value) {
+        left.add(value);
+      }
+
+      @Override
+      public void onError(Exception error) {
+
+      }
+
+      @Override
+      public void onCompleted() {
+        tellEveryoneLeftIsDone();
+      }
+
+    };
+  }
+
+  public Observer<TupleEvent> wireRight() {
+    return new Observer<TupleEvent>() {
+
+      @Override
+      public void onNext(TupleEvent value) {
+        if (!leftDone) waitUntilLeftIsDone();
+        if (left.contains(value)) {
+          out.onNext(value);
+        }
+      }
+
+      @Override
+      public void onError(Exception error) {
+      }
+
+      @Override
+      public void onCompleted() {
+        waitUntilLeftIsDone();
+        out.onCompleted();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
new file mode 100644
index 0000000..812be3a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+
+public class EventPrinter<T> implements Observer<T> {
+
+  @Override
+  public void onNext(T value) {
+    System.out.println(this + ": " + value);
+  }
+
+  @Override
+  public void onError(Exception error) {
+    System.err.println(this + ": " + error);
+  }
+
+  @Override
+  public void onCompleted() {
+    System.out.println(this + ": done!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
new file mode 100644
index 0000000..3aa5647
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class NonBlockingJoin implements StaticObservable {
+  private final AtomicBoolean leftDone = new AtomicBoolean(false);
+  private final AtomicBoolean completed = new AtomicBoolean(false);
+  private final AtomicBoolean sentCompleted = new AtomicBoolean(false);
+
+  private final Observer<TupleEvent> out;
+
+  private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<TupleEvent>();
+  private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<TupleEvent>();
+
+  public NonBlockingJoin(Observer<TupleEvent> out) {
+    this.out = out;
+  }
+
+  private void drainRight() {
+    TupleEvent t;
+    if (leftDone.get()) {
+      while ((t = rightTable.pollFirst()) != null) {
+        if (leftTable.contains(t)) {
+          out.onNext(t);
+        }
+      }
+      if (completed.get()) {
+        // There cannot be any more additions to rightTable after
+        // completed is set to true, so this ensures that rightTable is
+        // really empty. (Someone could have inserted into it during the
+        // race between the previous while loop and the check of
+        // completed.)
+        while ((t = rightTable.pollFirst()) != null) {
+          if (leftTable.contains(t)) {
+            out.onNext(t);
+          }
+        }
+        if (sentCompleted.getAndSet(true) == false) {
+          out.onCompleted();
+        }
+      }
+    }
+  }
+
+  public Observer<TupleEvent> wireLeft() {
+    return new Observer<TupleEvent>() {
+
+      @Override
+      public void onNext(TupleEvent value) {
+        leftTable.add(value);
+      }
+
+      @Override
+      public void onError(Exception error) {
+        leftTable.clear();
+        rightTable.clear();
+        out.onError(error);
+      }
+
+      @Override
+      public void onCompleted() {
+        leftDone.set(true);
+        drainRight();
+      }
+
+    };
+  }
+
+  public Observer<TupleEvent> wireRight() {
+    return new Observer<TupleEvent>() {
+
+      @Override
+      public void onNext(TupleEvent value) {
+        if (leftTable.contains(value)) {
+          out.onNext(value);
+        } else if (!leftDone.get()) {
+          rightTable.add(value);
+        }
+        drainRight();
+      }
+
+      @Override
+      public void onError(Exception error) {
+        leftTable.clear();
+        rightTable.clear();
+        out.onError(error);
+      }
+
+      @Override
+      public void onCompleted() {
+        completed.set(true);
+        drainRight();
+      }
+    };
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
new file mode 100644
index 0000000..978b2de
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+
+public class TupleEvent implements Comparable<TupleEvent> {
+  private final int key;
+  private final String val;
+
+  public TupleEvent(int key, String val) {
+    this.key = key;
+    this.val = val;
+  }
+
+  @Override
+  public int compareTo(TupleEvent o) {
+    int keycmp = Integer.compare(key, o.key);
+    if (keycmp != 0) {
+      return keycmp;
+    }
+    return val.compareTo(o.val);
+  }
+
+  @Override
+  public String toString() {
+    return "(" + key + ", " + val + ")";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
new file mode 100644
index 0000000..378ef3d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+public class TupleSource implements StaticObservable, Stage {
+  final Thread[] threads;
+  final Observer<TupleEvent> out;
+
+  public TupleSource(final Observer<TupleEvent> out, final int max, final int numThreads, final boolean evenOnly) {
+    this.out = out;
+    threads = new Thread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      final int threadid = i;
+      threads[i] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          for (int i = 0; i < max / ((evenOnly ? 2 : 1) * numThreads); i++) {
+            int j = i * numThreads + threadid;
+            if (evenOnly) {
+              j *= 2;
+            }
+            out.onNext(new TupleEvent(j, j + ""));
+          }
+        }
+
+      });
+      threads[i].start();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Thread t : threads) {
+      t.join();
+    }
+    out.onCompleted();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
new file mode 100644
index 0000000..4012f98
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.examples.join;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
new file mode 100644
index 0000000..202b321
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.p2p;
+
+/**
+ * The pull side of the interface: Clients implement this and register it with
+ * the PullToPush class.
+ *
+ * @param <T> the type of the event
+ */
+public interface EventSource<T> {
+
+  /**
+   * @return a event or null if no more messages are available.
+   */
+  public T getNext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
new file mode 100644
index 0000000..78e5bbe
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.p2p;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Performs a Pull-to-Push conversion in Wake.
+ * <p/>
+ * The class pulls from a set of event sources, and pushes to a single
+ * EventHandler. If the downstream event handler blocks, this will block,
+ * providing a simple rate limiting scheme.
+ * <p/>
+ * The EventSources are managed in a basic Queue.
+ *
+ * @param <T> the message type
+ */
+public final class Pull2Push<T> implements Runnable, AutoCloseable {
+
+  private final EventHandler<T> output; // The downstream EventHandler
+  private final Queue<EventSource<T>> sources = new LinkedList<>(); // The upstream event sources
+  private boolean closed = false;
+
+  /**
+   * @param output the EventHandler that receives the messages from this
+   *               Pull2Push.
+   */
+  public Pull2Push(final EventHandler<T> output) {
+    this.output = output;
+  }
+
+  /**
+   * Registers an event source.
+   *
+   * @param source The source that will be added to the queue of this
+   *               Pull2Push
+   */
+  public final void register(final EventSource<T> source) {
+    this.sources.add(source);
+  }
+
+  /**
+   * Executes the message loop.
+   */
+  @Override
+  public void run() {
+
+    while (!this.closed) {
+      // Grab the next available message source, if any
+      final EventSource<T> nextSource = sources.poll();
+      if (null != nextSource) {
+        // Grab the next message from that source, if any
+        final T message = nextSource.getNext();
+        if (null != message) {
+          // Add the source to the end of the queue again.
+          sources.add(nextSource);
+          // Send the message. Note that this may block depending on the underlying EventHandler.
+          this.output.onNext(message);
+        } else {
+          // The message source has returned null as the next message. We drop the message source in that case.
+          Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", nextSource.toString());
+        }
+      } else {
+        // No source where available. We could put a wait() here.
+      }
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
new file mode 100644
index 0000000..773a20f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A simple pull to push adapter.
+ */
+package org.apache.reef.wake.examples.p2p;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
new file mode 100644
index 0000000..e02a934
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.examples;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
new file mode 100644
index 0000000..87aa414
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.exception;
+
+/**
+ * Wake runtime exception
+ */
+public class WakeRuntimeException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs a new runtime wake exception with the specified detail message and cause
+   *
+   * @param s the detailed message
+   * @param e the cause
+   */
+  public WakeRuntimeException(final String s, final Throwable e) {
+    super(s, e);
+  }
+
+  /**
+   * Constructs a new runtime stage exception with the specified detail message
+   *
+   * @param s the detailed message
+   */
+  public WakeRuntimeException(final String s) {
+    super(s);
+  }
+
+  /**
+   * Constructs a new runtime stage exception with the specified cause
+   *
+   * @param e the cause
+   */
+  public WakeRuntimeException(final Throwable e) {
+    super(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
new file mode 100644
index 0000000..90fddee
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.exception;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
new file mode 100644
index 0000000..c5da7a5
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with an
+ * Iterable of the events spooled.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <T> type of events
+ * @see BlockingSignalEventHandler
+ */
+public final class BlockingEventHandler<T> implements EventHandler<T> {
+
+  private final int expectedSize;
+  private final EventHandler<Iterable<T>> destination;
+  private final AtomicInteger cursor;
+  // TODO: a queue is likely overly conservative given that we only need
+  // to preserve order of those pairs of events that didn't race (have an ordering)
+  private BlockingQueue<T> events = new LinkedBlockingQueue<>();
+
+  public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
+    this.expectedSize = expectedSize;
+    this.destination = destination;
+    this.cursor = new AtomicInteger(0);
+  }
+
+  @Override
+  public final void onNext(final T event) {
+    this.events.add(event);
+    int newCursor = this.cursor.incrementAndGet();
+
+    if (newCursor % expectedSize == 0) {
+      // FIXME: There is a race here where the person draining the events might
+      // not include their event as the last one. I'm going to assume this does not
+      // matter, since all events will still be drained exactly once by someone in
+      // the proper order
+
+      ArrayList<T> nonConcurrent = new ArrayList<>(expectedSize);
+      synchronized (events) {
+
+        // drainTo(maxElements) does not suffice because it has undefined behavior for
+        // any modifications (a better spec would possibly be undefined behavior except for appends)
+
+        // TODO: a non-locking implementation will simply atomically update the head of the 
+        // queue to index=expectedSize, so that the drainer may drain without synchronization
+        for (int i = 0; i < expectedSize; i++) {
+          nonConcurrent.add(events.poll());
+        }
+      }
+      this.destination.onNext(nonConcurrent);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
new file mode 100644
index 0000000..e590852
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with
+ * the <i>last event</i> received. The handler resets atomically to start
+ * receiving the next batch of events.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <T> type of events
+ * @see BlockingEventHandler
+ */
+public final class BlockingSignalEventHandler<T> implements EventHandler<T> {
+
+  private final int expectedSize;
+  private final EventHandler<T> destination;
+  private final AtomicInteger cursor;
+
+  public BlockingSignalEventHandler(final int expectedSize, final EventHandler<T> destination) {
+    this.expectedSize = expectedSize;
+    this.destination = destination;
+    this.cursor = new AtomicInteger(0);
+  }
+
+  @Override
+  public final void onNext(final T event) {
+    int newSize = this.cursor.incrementAndGet();
+
+    if (newSize % expectedSize == 0) {
+      this.destination.onNext(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
new file mode 100644
index 0000000..a186659
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+
+import javax.inject.Inject;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default remote identifier factory that creates a specific remote identifier
+ * from a string representation
+ * <p/>
+ * A string representation is broken into two parts type and type-specific details separated by "://"
+ * A remote identifier implementation should implement a constructor that accepts a string.
+ * The factory invokes a proper constructor by reflection.
+ */
+public class DefaultIdentifierFactory implements IdentifierFactory {
+
+  // map between type and remote identifier class
+  private final Map<String, Class<? extends Identifier>> typeToClazzMap;
+
+  /**
+   * Constructs a default remote identifier factory
+   */
+  @Inject
+  public DefaultIdentifierFactory() {
+    typeToClazzMap = new HashMap<>();
+    typeToClazzMap.put("socket", SocketRemoteIdentifier.class);
+  }
+
+  /**
+   * Constructs a default remote identifier factory
+   *
+   * @param typeToClazzMap the map of type strings to classes of remote identifiers
+   */
+  public DefaultIdentifierFactory(Map<String, Class<? extends Identifier>> typeToClazzMap) {
+    this.typeToClazzMap = typeToClazzMap;
+  }
+
+  /**
+   * Creates a new remote identifier instance
+   *
+   * @param str a string representation
+   * @return a remote identifier
+   * @throws RemoteRuntimeException
+   */
+  @Override
+  public Identifier getNewInstance(String str) {
+    int index = str.indexOf("://");
+    if (index < 0)
+      throw new RemoteRuntimeException("Invalid name " + str);
+    String type = str.substring(0, index);
+    Class<? extends Identifier> clazz = typeToClazzMap.get(type);
+    Class<?>[] argTypes = {String.class};
+    Constructor<? extends Identifier> constructor;
+    try {
+      constructor = clazz.getDeclaredConstructor(argTypes);
+      Object[] args = new Object[1];
+      args[0] = str.substring(index + 3);
+      return constructor.newInstance(args);
+    } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      e.printStackTrace();
+      throw new RemoteRuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
new file mode 100644
index 0000000..71c123e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A default thread factory implementation that names created threads
+ */
+public final class DefaultThreadFactory implements ThreadFactory {
+  private static final AtomicInteger poolNumber = new AtomicInteger(1);
+  private final ThreadGroup group;
+  private final AtomicInteger threadNumber = new AtomicInteger(1);
+  private final String prefix;
+  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+  /**
+   * Constructs a default thread factory
+   *
+   * @param prefix the name prefix of the created thread
+   */
+  public DefaultThreadFactory(String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+    this.prefix = prefix + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
+    this.uncaughtExceptionHandler = null;
+  }
+
+  /**
+   * Constructs a default thread factory
+   *
+   * @param prefix                   the name prefix of the created thread
+   * @param uncaughtExceptionHandler the uncaught exception handler of the created thread
+   */
+  public DefaultThreadFactory(String prefix, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+    SecurityManager s = System.getSecurityManager();
+    this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+    this.prefix = prefix + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
+    this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+  }
+
+  /**
+   * Sets a uncaught exception handler
+   *
+   * @param uncaughtExceptionHandler the uncaught exception handler
+   */
+  public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+    this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+  }
+
+  /**
+   * Creates a new thread
+   *
+   * @param r the runnable
+   */
+  @Override
+  public Thread newThread(Runnable r) {
+    Thread t = new Thread(group, r, prefix + threadNumber.getAndIncrement(), 0);
+    if (t.isDaemon())
+      t.setDaemon(false);
+    if (t.getPriority() != Thread.NORM_PRIORITY)
+      t.setPriority(Thread.NORM_PRIORITY);
+    if (uncaughtExceptionHandler != null)
+      t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+    return t;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
new file mode 100644
index 0000000..214beb8
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.AbstractEStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.StageConfiguration;
+
+import javax.inject.Inject;
+import java.util.concurrent.ForkJoinTask;
+import java.util.logging.Logger;
+
+/**
+ * This Wake event handling stage uses a {@link ForkJoinPool}
+ * to submit tasks. The advantage is that underlying workers
+ * have separate queues instead of sharing one. The queues are load
+ * balanced with work stealing.
+ * <p/>
+ * The pool is provided to the constructor, so multiple stages
+ * may use the same pool.
+ * <p/>
+ * Some advantage in throughput over other stage implementations should be seen
+ * when one wake stage is submitting to another using the same
+ * {@link WakeSharedPool}. In this case, the new event may be executed
+ * directly by that thread.
+ *
+ * @param <T> type of events
+ */
+public class ForkPoolStage<T> extends AbstractEStage<T> {
+  private static final Logger LOG = Logger.getLogger(ForkPoolStage.class.getName());
+
+  private final EventHandler<T> handler;
+  private final WakeSharedPool pool;
+
+  @Inject
+  public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) String stageName,
+                       @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
+                       WakeSharedPool sharedPool
+  ) {
+    super(stageName);
+    this.pool = sharedPool;
+    this.handler = handler;
+    //TODO: should WakeSharedPool register its stages?
+
+    StageManager.instance().register(this);
+  }
+
+  @Inject
+  public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
+                       WakeSharedPool sharedPool) {
+    this(ForkPoolStage.class.getName(), handler, sharedPool);
+  }
+
+  @Override
+  public void onNext(final T value) {
+    beforeOnNext();
+    pool.submit(new ForkJoinTask<T>() {
+      @Override
+      public T getRawResult() {
+        // tasks have no results because they are events
+        // this may be used for extensions
+        return null;
+      }
+
+      @Override
+      protected void setRawResult(T value) {
+        // tasks have no results because they are events
+        // this may be used for extensions
+      }
+
+      @Override
+      protected boolean exec() {
+        handler.onNext(value);
+        afterOnNext();
+        return true;
+      }
+    });
+  }
+
+
+  @Override
+  public void close() throws Exception {
+    LOG.warning("close(): " + pool.getClass().getName() + " " + pool + " must really be close()'d");
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
new file mode 100644
index 0000000..557d82c
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.AbstractEStage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * This stage uses a thread pool to schedule events in parallel.
+ * Should be used when input events are already materialized in a List and
+ * can be fired in any order.
+ *
+ * @param numThreads  fixed number of threads available in the pool
+ * @param granularity maximum number of events executed serially. The right choice will balance task spawn overhead with parallelism.
+ */
+public class IndependentIterationsThreadPoolStage<T> extends AbstractEStage<List<T>> {
+
+  final private int granularity;
+  private EventHandler<T> handler;
+  private ExecutorService executor;
+
+  public IndependentIterationsThreadPoolStage(EventHandler<T> handler, int numThreads, int granularity) {
+    super(handler.getClass().getName());
+    this.handler = handler;
+    this.executor = Executors.newFixedThreadPool(numThreads);
+    this.granularity = granularity;
+  }
+
+  private Runnable newTask(final List<T> iterations) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        for (T e : iterations) {
+          handler.onNext(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public void onNext(final List<T> iterations) {
+    Logger.getAnonymousLogger().info("Execute new task [" + iterations.size());
+    final int size = iterations.size();
+    for (int i = 0; i < size; i += granularity) {
+      int toIndex = i + granularity;
+      toIndex = toIndex > size ? size : toIndex;
+      executor.execute(newTask(iterations.subList(i, toIndex)));
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    executor.shutdown();
+    executor.awaitTermination(1000, TimeUnit.DAYS);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
new file mode 100644
index 0000000..86269a4
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A logging event handler
+ *
+ * @param <T> type
+ */
+public class LoggingEventHandler<T> implements EventHandler<T> {
+  private static final Logger LOG = Logger.getLogger(LoggingEventHandler.class.getName());
+
+  @Inject
+  public LoggingEventHandler() {
+  }
+
+  /**
+   * Logs the event
+   *
+   * @param value an event
+   */
+  @Override
+  public void onNext(T value) {
+    LOG.log(Level.FINE, "{0}", value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
new file mode 100644
index 0000000..b85c37c
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility for logging
+ */
+public class LoggingUtils {
+
+  /**
+   * Sets the logging level
+   *
+   * @param level the logging level
+   */
+  public static void setLoggingLevel(Level level) {
+    Handler[] handlers = Logger.getLogger("").getHandlers();
+    ConsoleHandler ch = null;
+    for (Handler h : handlers) {
+      if (h instanceof ConsoleHandler) {
+        ch = (ConsoleHandler) h;
+        break;
+      }
+    }
+    if (ch == null) {
+      ch = new ConsoleHandler();
+      Logger.getLogger("").addHandler(ch);
+    }
+    ch.setLevel(level);
+    Logger.getLogger("").setLevel(level);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
new file mode 100644
index 0000000..d9f923f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A logging void event handler
+ */
+public class LoggingVoidEventHandler implements EventHandler<Void> {
+  private static final Logger LOG = Logger.getLogger(LoggingVoidEventHandler.class.getName());
+
+  @Inject
+  public LoggingVoidEventHandler() {
+  }
+
+  /**
+   * Logs the event
+   *
+   * @param value an event
+   */
+  @Override
+  public void onNext(Void value) {
+    LOG.log(Level.FINE, "Logging void event handler");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
new file mode 100644
index 0000000..548ef0d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler combines two events of different types into a single Pair of events.
+ * Handler will block until both events are received.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <L> type of event
+ * @param <R> type of event
+ * @see BlockingEventHandler
+ */
+@Unit
+public final class MergingEventHandler<L, R> {
+
+  private static Logger LOG = Logger.getLogger(MergingEventHandler.class.getName());
+  public final EventHandler<L> left = new Left();
+  public final EventHandler<R> right = new Right();
+  private final Object mutex = new Object();
+  private final EventHandler<Pair<L, R>> destination;
+  private L leftEvent;
+  private R rightEvent;
+
+  @Inject
+  public MergingEventHandler(final EventHandler<Pair<L, R>> destination) {
+    this.destination = destination;
+    reset();
+  }
+
+  /*
+   * Not thread safe. Must be externally synchronized.
+   */
+  private void reset() {
+    rightEvent = null;
+    leftEvent = null;
+  }
+
+  public static class Pair<S1, S2> {
+    public final S1 first;
+    public final S2 second;
+
+    private Pair(S1 s1, S2 s2) {
+      this.first = s1;
+      this.second = s2;
+    }
+  }
+
+  private class Left implements EventHandler<L> {
+
+    @Override
+    public void onNext(final L event) {
+
+      L leftRef = null;
+      R rightRef = null;
+
+      synchronized (mutex) {
+
+        while (leftEvent != null) {
+          try {
+            mutex.wait();
+          } catch (final InterruptedException e) {
+            LOG.log(Level.SEVERE, "Wait interrupted.", e);
+          }
+        }
+
+        if (LOG.isLoggable(Level.FINEST)) {
+          LOG.log(Level.FINEST, "{0} producing left {1}",
+              new Object[]{Thread.currentThread(), event});
+        }
+
+        leftEvent = event;
+        leftRef = event;
+
+        if (rightEvent != null) {
+          rightRef = rightEvent;
+          reset();
+          mutex.notifyAll();
+        }
+      }
+
+      if (rightRef != null) {
+        // I get to fire the event
+        destination.onNext(new Pair<L, R>(leftRef, rightRef));
+      }
+    }
+  }
+
+  private class Right implements EventHandler<R> {
+
+    @Override
+    public void onNext(final R event) {
+
+      L leftRef = null;
+      R rightRef = null;
+
+      synchronized (mutex) {
+
+        while (rightEvent != null) {
+          try {
+            mutex.wait();
+          } catch (final InterruptedException e) {
+            LOG.log(Level.SEVERE, "Wait interrupted.", e);
+          }
+        }
+
+        if (LOG.isLoggable(Level.FINEST)) {
+          LOG.log(Level.FINEST, "{0} producing right {1}",
+              new Object[]{Thread.currentThread(), event});
+        }
+
+        rightEvent = event;
+        rightRef = event;
+
+        if (leftEvent != null) {
+          leftRef = leftEvent;
+          reset();
+          mutex.notifyAll();
+        }
+      }
+
+      if (leftRef != null) {
+        // I get to fire the event
+        destination.onNext(new Pair<L, R>(leftRef, rightRef));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
new file mode 100644
index 0000000..6debd97
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The EventHandler used as the default for the Clock.StartHandler event.
+ */
+public class MissingStartHandlerHandler implements EventHandler<StartTime> {
+
+  @Inject
+  private MissingStartHandlerHandler() {
+  }
+
+  @Override
+  public void onNext(final StartTime value) {
+    Logger.getLogger(MissingStartHandlerHandler.class.toString())
+        .log(Level.WARNING,
+            "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close.");
+  }
+}