You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:35 UTC
[02/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
new file mode 100644
index 0000000..5e79d39
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EStage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Stage that executes an event handler
+ *
+ * @param <T> type
+ */
+public interface EStage<T> extends EventHandler<T>, Stage {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
new file mode 100644
index 0000000..871e64e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/EventHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Handler to process an event
+ *
+ * @param <T> type
+ */
+public interface EventHandler<T> {
+
+ /**
+ * Handles an event
+ *
+ * @param value an event
+ */
+ public void onNext(T value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
new file mode 100644
index 0000000..784dafb
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifiable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+public interface Identifiable {
+
+ /**
+ * Returns an identifier of this object
+ *
+ * @return an identifier of this object
+ */
+ public Identifier getId();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
new file mode 100644
index 0000000..377b23e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Identifier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/*
+ * An identifier class for REEF. Identifiers are a generic naming primitive
+ * that carry some information about the type of the object they point to.
+ * Typical examples are server sockets, filenames, and requests.
+ *
+ * Identifier constructors should take zero arguments, or take a single string.
+ *
+ */
+public interface Identifier {
+
+ /**
+ * Returns a hash code for the object
+ *
+ * @return a hash code value for this object
+ */
+ public int hashCode();
+
+ /**
+ * Checks that another object is equal to this object
+ *
+ * @param o another object
+ * @return true if the object is the same as the object argument; false,
+ * otherwise
+ */
+ public boolean equals(Object o);
+
+ /**
+ * Return a string representation of this object. This method should return a
+ * URL-style string, that begins with "type://", where "type" is chosen to
+ * uniquely identify this type of identifier.
+ */
+ public String toString();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
new file mode 100644
index 0000000..828eda9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+public interface IdentifierFactory {
+
+ /**
+ * Creates a RemoteIdentifier object
+ *
+ * @param str a string
+ * @return an identifier
+ */
+ public Identifier getNewInstance(String str);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
new file mode 100644
index 0000000..a2e7df9
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/IdentifierParser.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.wake.impl.DefaultIdentifierFactory;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+import org.apache.reef.wake.storage.FileIdentifier;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class IdentifierParser implements ExternalConstructor<Identifier> {
+ private static final IdentifierFactory factory;
+
+ // TODO: Modify tang to allow this to use a factory pattern.
+ static {
+ Map<String, Class<? extends Identifier>> map = new ConcurrentHashMap<>();
+
+ map.put("socket", SocketRemoteIdentifier.class);
+ map.put("file", FileIdentifier.class);
+
+ factory = new DefaultIdentifierFactory(map);
+ }
+
+ final Identifier id;
+
+ @Inject
+ IdentifierParser(String s) {
+ id = factory.getNewInstance(s);
+ }
+
+ @Override
+ public Identifier newInstance() {
+ return id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
new file mode 100644
index 0000000..df79d0f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/Stage.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+/**
+ * Stage is an execution unit for events and provides a way to reclaim its resources
+ */
+public interface Stage extends AutoCloseable {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
new file mode 100644
index 0000000..0ccbb3a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/StageConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.rx.Observer;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Configuration options for Wake Stage
+ */
+public final class StageConfiguration {
+
+ @NamedParameter(doc = "The stage name.")
+ public static final class StageName implements Name<String> {
+ }
+
+ @NamedParameter(doc = "The event handler for the stage.")
+ public static final class StageHandler implements Name<EventHandler<?>> {
+ }
+
+ @NamedParameter(doc = "The error handler for the stage.")
+ public static final class ErrorHandler implements Name<EventHandler<Throwable>> {
+ }
+
+ @NamedParameter(doc = "The number of threads for the stage.")
+ public static final class NumberOfThreads implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "The capacity for the stage.")
+ public static final class Capacity implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "The executor service for the stage.")
+ public static final class StageExecutorService implements Name<ExecutorService> {
+ }
+
+ @NamedParameter(doc = "The initial delay for periodic events of the timer stage.")
+ public static final class TimerInitialDelay implements Name<Long> {
+ }
+
+ @NamedParameter(doc = "The period for periodic events of the timer stage.")
+ public static final class TimerPeriod implements Name<Long> {
+ }
+
+ @NamedParameter(doc = "The observer for the stage.")
+ public static final class StageObserver implements Name<Observer<?>> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
new file mode 100644
index 0000000..7adee0b
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeConfiguration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationFile;
+import org.apache.reef.wake.exception.WakeRuntimeException;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Wake parameter configuration
+ */
+public final class WakeConfiguration {
+ private final static Logger LOG = Logger.getLogger(WakeConfiguration.class.getName());
+
+ @Inject
+ public WakeConfiguration(final @Parameter(FileName.class) String confFileName) {
+ if (confFileName.equals("")) {
+ LOG.log(Level.WARNING, "The Wake configuration file is not specified.");
+ } else {
+ Tang t = Tang.Factory.getTang();
+ JavaConfigurationBuilder cb = t.newConfigurationBuilder();
+ try {
+ ConfigurationFile.addConfiguration(cb, new File(confFileName));
+ } catch (BindException e) {
+ throw new WakeRuntimeException(e);
+ } catch (IOException e) {
+ throw new WakeRuntimeException(e);
+ }
+ }
+ }
+
+ @NamedParameter(doc = "Configuration file name", default_value = "")
+ public final static class FileName implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
new file mode 100644
index 0000000..897cc08
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/WakeParameters.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/*
+ * Default parameters for Wake
+ */
+public final class WakeParameters {
+
+ public final static int MAX_FRAME_LENGTH = 1 * 1024 * 1024;
+
+ public final static long EXECUTOR_SHUTDOWN_TIMEOUT = 1000;
+
+ public final static long REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT = 10000;
+
+ @NamedParameter(doc = "Maximum frame length unit", default_value = "" + MAX_FRAME_LENGTH)
+ public final static class MaxFrameLength implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "Executor shutdown timeout", default_value = "" + EXECUTOR_SHUTDOWN_TIMEOUT)
+ public final static class ExecutorShutdownTimeout implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "Remote send timeout", default_value = "" + REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT)
+ public final static class RemoteSendTimeout implements Name<Integer> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
new file mode 100644
index 0000000..63b4d30
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/accumulate/CombinerStage.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.accumulate;
+
+
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.rx.Observer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class CombinerStage<K extends Comparable<K>, V> implements Stage {
+
+ private final Combiner<K, V> c;
+ private final Observer<Map.Entry<K, V>> o;
+ private final OutputThread worker = new OutputThread();
+ private final ConcurrentSkipListMap<K, V> register = new ConcurrentSkipListMap<>();
+ private volatile boolean done = false;
+
+ public CombinerStage(Combiner<K, V> c, Observer<Map.Entry<K, V>> o) {
+ this.c = c;
+ this.o = o;
+ worker.start();
+ }
+
+ public Observer<Map.Entry<K, V>> wireIn() {
+ return new Observer<Map.Entry<K, V>>() {
+ @Override
+ public void onNext(Map.Entry<K, V> pair) {
+ V old;
+ V newVal;
+ boolean wasEmpty = register.isEmpty();
+ boolean succ = false;
+
+ while (!succ) {
+ old = register.get(pair.getKey());
+ newVal = c.combine(pair.getKey(), old, pair.getValue());
+ if (old == null) {
+ succ = (null == register.putIfAbsent(pair.getKey(), newVal));
+ } else {
+ succ = register.replace(pair.getKey(), old, newVal);
+ }
+ }
+
+ if (wasEmpty) {
+ synchronized (register) {
+ register.notify();
+ }
+ }
+ }
+
+ @Override
+ public void onError(Exception error) {
+ o.onError(error);
+ }
+
+ @Override
+ public void onCompleted() {
+ synchronized (register) {
+ done = true;
+ if (register.isEmpty()) {
+ register.notify();
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public void close() throws Exception {
+ worker.join();
+ }
+
+ public interface Combiner<K extends Comparable<K>, V> {
+ V combine(K key, V old, V cur);
+ }
+
+ public static class Pair<K extends Comparable<K>, V> implements Map.Entry<K, V>, Comparable<Map.Entry<K, V>> {
+ private final K k;
+ private final V v;
+
+ public Pair(K k, V v) {
+ this.k = k;
+ this.v = v;
+ }
+
+ @Override
+ public int compareTo(Map.Entry<K, V> arg0) {
+ return k.compareTo(arg0.getKey());
+ }
+
+ @Override
+ public K getKey() {
+ return k;
+ }
+
+ @Override
+ public V getValue() {
+ return v;
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class OutputThread extends Thread {
+ public OutputThread() {
+ super("grouper-output-thread");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (register.isEmpty()) {
+ synchronized (register) {
+ while (register.isEmpty() && !done) {
+ try {
+ register.wait();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ if (done) {
+ break;
+ }
+ }
+ }
+ Map.Entry<K, V> cursor = register.pollFirstEntry();
+ while (cursor != null) {
+ o.onNext(cursor);
+ K nextKey = register.higherKey(cursor.getKey());
+
+ /* If there is more than one OutputThread worker then the remove() -> null case
+ * must be handled
+ */
+ cursor = (nextKey == null) ? null : new Pair<>(nextKey, register.remove(nextKey));
+ }
+ }
+ o.onCompleted();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
new file mode 100644
index 0000000..a5f0ba6
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+
+
+public class BlockingJoin implements StaticObservable {
+ private final Observer<TupleEvent> out;
+ private final ConcurrentSkipListSet<TupleEvent> left = new ConcurrentSkipListSet<>();
+ boolean leftDone = false;
+
+ public BlockingJoin(Observer<TupleEvent> out) {
+ this.out = out;
+ }
+
+ private synchronized void tellEveryoneLeftIsDone() {
+ leftDone = true;
+ notifyAll();
+ }
+
+ private synchronized void waitUntilLeftIsDone() {
+ while (!leftDone) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "No support for interrupted threads here!", e);
+ }
+ }
+ }
+
+ public Observer<TupleEvent> wireLeft() {
+ return new Observer<TupleEvent>() {
+
+ @Override
+ public void onNext(TupleEvent value) {
+ left.add(value);
+ }
+
+ @Override
+ public void onError(Exception error) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+ tellEveryoneLeftIsDone();
+ }
+
+ };
+ }
+
+ public Observer<TupleEvent> wireRight() {
+ return new Observer<TupleEvent>() {
+
+ @Override
+ public void onNext(TupleEvent value) {
+ if (!leftDone) waitUntilLeftIsDone();
+ if (left.contains(value)) {
+ out.onNext(value);
+ }
+ }
+
+ @Override
+ public void onError(Exception error) {
+ }
+
+ @Override
+ public void onCompleted() {
+ waitUntilLeftIsDone();
+ out.onCompleted();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
new file mode 100644
index 0000000..812be3a
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/EventPrinter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+
+public class EventPrinter<T> implements Observer<T> {
+
+ @Override
+ public void onNext(T value) {
+ System.out.println(this + ": " + value);
+ }
+
+ @Override
+ public void onError(Exception error) {
+ System.err.println(this + ": " + error);
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println(this + ": done!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
new file mode 100644
index 0000000..3aa5647
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/NonBlockingJoin.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class NonBlockingJoin implements StaticObservable {
+ private final AtomicBoolean leftDone = new AtomicBoolean(false);
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+ private final AtomicBoolean sentCompleted = new AtomicBoolean(false);
+
+ private final Observer<TupleEvent> out;
+
+ private final ConcurrentSkipListSet<TupleEvent> leftTable = new ConcurrentSkipListSet<TupleEvent>();
+ private final ConcurrentSkipListSet<TupleEvent> rightTable = new ConcurrentSkipListSet<TupleEvent>();
+
+ public NonBlockingJoin(Observer<TupleEvent> out) {
+ this.out = out;
+ }
+
+ private void drainRight() {
+ TupleEvent t;
+ if (leftDone.get()) {
+ while ((t = rightTable.pollFirst()) != null) {
+ if (leftTable.contains(t)) {
+ out.onNext(t);
+ }
+ }
+ if (completed.get()) {
+ // There cannot be any more additions to rightTable after
+ // completed is set to true, so this ensures that rightTable is
+ // really empty. (Someone could have inserted into it during the
+ // race between the previous while loop and the check of
+ // completed.)
+ while ((t = rightTable.pollFirst()) != null) {
+ if (leftTable.contains(t)) {
+ out.onNext(t);
+ }
+ }
+ if (sentCompleted.getAndSet(true) == false) {
+ out.onCompleted();
+ }
+ }
+ }
+ }
+
+ public Observer<TupleEvent> wireLeft() {
+ return new Observer<TupleEvent>() {
+
+ @Override
+ public void onNext(TupleEvent value) {
+ leftTable.add(value);
+ }
+
+ @Override
+ public void onError(Exception error) {
+ leftTable.clear();
+ rightTable.clear();
+ out.onError(error);
+ }
+
+ @Override
+ public void onCompleted() {
+ leftDone.set(true);
+ drainRight();
+ }
+
+ };
+ }
+
+ public Observer<TupleEvent> wireRight() {
+ return new Observer<TupleEvent>() {
+
+ @Override
+ public void onNext(TupleEvent value) {
+ if (leftTable.contains(value)) {
+ out.onNext(value);
+ } else if (!leftDone.get()) {
+ rightTable.add(value);
+ }
+ drainRight();
+ }
+
+ @Override
+ public void onError(Exception error) {
+ leftTable.clear();
+ rightTable.clear();
+ out.onError(error);
+ }
+
+ @Override
+ public void onCompleted() {
+ completed.set(true);
+ drainRight();
+ }
+ };
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
new file mode 100644
index 0000000..978b2de
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+
+public class TupleEvent implements Comparable<TupleEvent> {
+ private final int key;
+ private final String val;
+
+ public TupleEvent(int key, String val) {
+ this.key = key;
+ this.val = val;
+ }
+
+ @Override
+ public int compareTo(TupleEvent o) {
+ int keycmp = Integer.compare(key, o.key);
+ if (keycmp != 0) {
+ return keycmp;
+ }
+ return val.compareTo(o.val);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + key + ", " + val + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
new file mode 100644
index 0000000..378ef3d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.join;
+
+import org.apache.reef.wake.Stage;
+import org.apache.reef.wake.rx.Observer;
+import org.apache.reef.wake.rx.StaticObservable;
+
+public class TupleSource implements StaticObservable, Stage {
+ final Thread[] threads;
+ final Observer<TupleEvent> out;
+
+ public TupleSource(final Observer<TupleEvent> out, final int max, final int numThreads, final boolean evenOnly) {
+ this.out = out;
+ threads = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ final int threadid = i;
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < max / ((evenOnly ? 2 : 1) * numThreads); i++) {
+ int j = i * numThreads + threadid;
+ if (evenOnly) {
+ j *= 2;
+ }
+ out.onNext(new TupleEvent(j, j + ""));
+ }
+ }
+
+ });
+ threads[i].start();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Thread t : threads) {
+ t.join();
+ }
+ out.onCompleted();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
new file mode 100644
index 0000000..4012f98
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.examples.join;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
new file mode 100644
index 0000000..202b321
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/EventSource.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.p2p;
+
+/**
+ * The pull side of the interface: Clients implement this and register it with
+ * the PullToPush class.
+ *
+ * @param <T> the type of the event
+ */
+public interface EventSource<T> {
+
+ /**
+ * @return a event or null if no more messages are available.
+ */
+ public T getNext();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
new file mode 100644
index 0000000..78e5bbe
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.examples.p2p;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Performs a Pull-to-Push conversion in Wake.
+ * <p/>
+ * The class pulls from a set of event sources, and pushes to a single
+ * EventHandler. If the downstream event handler blocks, this will block,
+ * providing a simple rate limiting scheme.
+ * <p/>
+ * The EventSources are managed in a basic Queue.
+ *
+ * @param <T> the message type
+ */
+public final class Pull2Push<T> implements Runnable, AutoCloseable {
+
+ private final EventHandler<T> output; // The downstream EventHandler
+ private final Queue<EventSource<T>> sources = new LinkedList<>(); // The upstream event sources
+ private boolean closed = false;
+
+ /**
+ * @param output the EventHandler that receives the messages from this
+ * Pull2Push.
+ */
+ public Pull2Push(final EventHandler<T> output) {
+ this.output = output;
+ }
+
+ /**
+ * Registers an event source.
+ *
+ * @param source The source that will be added to the queue of this
+ * Pull2Push
+ */
+ public final void register(final EventSource<T> source) {
+ this.sources.add(source);
+ }
+
+ /**
+ * Executes the message loop.
+ */
+ @Override
+ public void run() {
+
+ while (!this.closed) {
+ // Grab the next available message source, if any
+ final EventSource<T> nextSource = sources.poll();
+ if (null != nextSource) {
+ // Grab the next message from that source, if any
+ final T message = nextSource.getNext();
+ if (null != message) {
+ // Add the source to the end of the queue again.
+ sources.add(nextSource);
+ // Send the message. Note that this may block depending on the underlying EventHandler.
+ this.output.onNext(message);
+ } else {
+ // The message source has returned null as the next message. We drop the message source in that case.
+ Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", nextSource.toString());
+ }
+ } else {
+ // No source where available. We could put a wait() here.
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.closed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
new file mode 100644
index 0000000..773a20f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A simple pull to push adapter.
+ */
+package org.apache.reef.wake.examples.p2p;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
new file mode 100644
index 0000000..e02a934
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.examples;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
new file mode 100644
index 0000000..87aa414
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/WakeRuntimeException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.exception;
+
+/**
+ * Wake runtime exception
+ */
+public class WakeRuntimeException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new runtime wake exception with the specified detail message and cause
+ *
+ * @param s the detailed message
+ * @param e the cause
+ */
+ public WakeRuntimeException(final String s, final Throwable e) {
+ super(s, e);
+ }
+
+ /**
+ * Constructs a new runtime stage exception with the specified detail message
+ *
+ * @param s the detailed message
+ */
+ public WakeRuntimeException(final String s) {
+ super(s);
+ }
+
+ /**
+ * Constructs a new runtime stage exception with the specified cause
+ *
+ * @param e the cause
+ */
+ public WakeRuntimeException(final Throwable e) {
+ super(e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
new file mode 100644
index 0000000..90fddee
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/exception/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ */
+package org.apache.reef.wake.exception;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
new file mode 100644
index 0000000..c5da7a5
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingEventHandler.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with an
+ * Iterable of the events spooled.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <T> type of events
+ * @see BlockingSignalEventHandler
+ */
+public final class BlockingEventHandler<T> implements EventHandler<T> {
+
+ private final int expectedSize;
+ private final EventHandler<Iterable<T>> destination;
+ private final AtomicInteger cursor;
+ // TODO: a queue is likely overly conservative given that we only need
+ // to preserve order of those pairs of events that didn't race (have an ordering)
+ private BlockingQueue<T> events = new LinkedBlockingQueue<>();
+
+ public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
+ this.expectedSize = expectedSize;
+ this.destination = destination;
+ this.cursor = new AtomicInteger(0);
+ }
+
+ @Override
+ public final void onNext(final T event) {
+ this.events.add(event);
+ int newCursor = this.cursor.incrementAndGet();
+
+ if (newCursor % expectedSize == 0) {
+ // FIXME: There is a race here where the person draining the events might
+ // not include their event as the last one. I'm going to assume this does not
+ // matter, since all events will still be drained exactly once by someone in
+ // the proper order
+
+ ArrayList<T> nonConcurrent = new ArrayList<>(expectedSize);
+ synchronized (events) {
+
+ // drainTo(maxElements) does not suffice because it has undefined behavior for
+ // any modifications (a better spec would possibly be undefined behavior except for appends)
+
+ // TODO: a non-locking implementation will simply atomically update the head of the
+ // queue to index=expectedSize, so that the drainer may drain without synchronization
+ for (int i = 0; i < expectedSize; i++) {
+ nonConcurrent.add(events.poll());
+ }
+ }
+ this.destination.onNext(nonConcurrent);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
new file mode 100644
index 0000000..e590852
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/BlockingSignalEventHandler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with
+ * the <i>last event</i> received. The handler resets atomically to start
+ * receiving the next batch of events.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <T> type of events
+ * @see BlockingEventHandler
+ */
+public final class BlockingSignalEventHandler<T> implements EventHandler<T> {
+
+ private final int expectedSize;
+ private final EventHandler<T> destination;
+ private final AtomicInteger cursor;
+
+ public BlockingSignalEventHandler(final int expectedSize, final EventHandler<T> destination) {
+ this.expectedSize = expectedSize;
+ this.destination = destination;
+ this.cursor = new AtomicInteger(0);
+ }
+
+ @Override
+ public final void onNext(final T event) {
+ int newSize = this.cursor.incrementAndGet();
+
+ if (newSize % expectedSize == 0) {
+ this.destination.onNext(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
new file mode 100644
index 0000000..a186659
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+
+import javax.inject.Inject;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default remote identifier factory that creates a specific remote identifier
+ * from a string representation
+ * <p/>
+ * A string representation is broken into two parts type and type-specific details separated by "://"
+ * A remote identifier implementation should implement a constructor that accepts a string.
+ * The factory invokes a proper constructor by reflection.
+ */
+public class DefaultIdentifierFactory implements IdentifierFactory {
+
+ // map between type and remote identifier class
+ private final Map<String, Class<? extends Identifier>> typeToClazzMap;
+
+ /**
+ * Constructs a default remote identifier factory
+ */
+ @Inject
+ public DefaultIdentifierFactory() {
+ typeToClazzMap = new HashMap<>();
+ typeToClazzMap.put("socket", SocketRemoteIdentifier.class);
+ }
+
+ /**
+ * Constructs a default remote identifier factory
+ *
+ * @param typeToClazzMap the map of type strings to classes of remote identifiers
+ */
+ public DefaultIdentifierFactory(Map<String, Class<? extends Identifier>> typeToClazzMap) {
+ this.typeToClazzMap = typeToClazzMap;
+ }
+
+ /**
+ * Creates a new remote identifier instance
+ *
+ * @param str a string representation
+ * @return a remote identifier
+ * @throws RemoteRuntimeException
+ */
+ @Override
+ public Identifier getNewInstance(String str) {
+ int index = str.indexOf("://");
+ if (index < 0)
+ throw new RemoteRuntimeException("Invalid name " + str);
+ String type = str.substring(0, index);
+ Class<? extends Identifier> clazz = typeToClazzMap.get(type);
+ Class<?>[] argTypes = {String.class};
+ Constructor<? extends Identifier> constructor;
+ try {
+ constructor = clazz.getDeclaredConstructor(argTypes);
+ Object[] args = new Object[1];
+ args[0] = str.substring(index + 3);
+ return constructor.newInstance(args);
+ } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ e.printStackTrace();
+ throw new RemoteRuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
new file mode 100644
index 0000000..71c123e
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultThreadFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A default thread factory implementation that names created threads
+ */
+public final class DefaultThreadFactory implements ThreadFactory {
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String prefix;
+ private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+ /**
+ * Constructs a default thread factory
+ *
+ * @param prefix the name prefix of the created thread
+ */
+ public DefaultThreadFactory(String prefix) {
+ SecurityManager s = System.getSecurityManager();
+ this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.prefix = prefix + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
+ this.uncaughtExceptionHandler = null;
+ }
+
+ /**
+ * Constructs a default thread factory
+ *
+ * @param prefix the name prefix of the created thread
+ * @param uncaughtExceptionHandler the uncaught exception handler of the created thread
+ */
+ public DefaultThreadFactory(String prefix, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ SecurityManager s = System.getSecurityManager();
+ this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.prefix = prefix + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
+ this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+ }
+
+ /**
+ * Sets a uncaught exception handler
+ *
+ * @param uncaughtExceptionHandler the uncaught exception handler
+ */
+ public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+ }
+
+ /**
+ * Creates a new thread
+ *
+ * @param r the runnable
+ */
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, prefix + threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ if (uncaughtExceptionHandler != null)
+ t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+ return t;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
new file mode 100644
index 0000000..214beb8
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ForkPoolStage.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.AbstractEStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.StageConfiguration;
+
+import javax.inject.Inject;
+import java.util.concurrent.ForkJoinTask;
+import java.util.logging.Logger;
+
+/**
+ * This Wake event handling stage uses a {@link ForkJoinPool}
+ * to submit tasks. The advantage is that underlying workers
+ * have separate queues instead of sharing one. The queues are load
+ * balanced with work stealing.
+ * <p/>
+ * The pool is provided to the constructor, so multiple stages
+ * may use the same pool.
+ * <p/>
+ * Some advantage in throughput over other stage implementations should be seen
+ * when one wake stage is submitting to another using the same
+ * {@link WakeSharedPool}. In this case, the new event may be executed
+ * directly by that thread.
+ *
+ * @param <T> type of events
+ */
+public class ForkPoolStage<T> extends AbstractEStage<T> {
+ private static final Logger LOG = Logger.getLogger(ForkPoolStage.class.getName());
+
+ private final EventHandler<T> handler;
+ private final WakeSharedPool pool;
+
+ @Inject
+ public ForkPoolStage(@Parameter(StageConfiguration.StageName.class) String stageName,
+ @Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
+ WakeSharedPool sharedPool
+ ) {
+ super(stageName);
+ this.pool = sharedPool;
+ this.handler = handler;
+ //TODO: should WakeSharedPool register its stages?
+
+ StageManager.instance().register(this);
+ }
+
+ @Inject
+ public ForkPoolStage(@Parameter(StageConfiguration.StageHandler.class) EventHandler<T> handler,
+ WakeSharedPool sharedPool) {
+ this(ForkPoolStage.class.getName(), handler, sharedPool);
+ }
+
+ @Override
+ public void onNext(final T value) {
+ beforeOnNext();
+ pool.submit(new ForkJoinTask<T>() {
+ @Override
+ public T getRawResult() {
+ // tasks have no results because they are events
+ // this may be used for extensions
+ return null;
+ }
+
+ @Override
+ protected void setRawResult(T value) {
+ // tasks have no results because they are events
+ // this may be used for extensions
+ }
+
+ @Override
+ protected boolean exec() {
+ handler.onNext(value);
+ afterOnNext();
+ return true;
+ }
+ });
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ LOG.warning("close(): " + pool.getClass().getName() + " " + pool + " must really be close()'d");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
new file mode 100644
index 0000000..557d82c
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.AbstractEStage;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * This stage uses a thread pool to schedule events in parallel.
+ * Should be used when input events are already materialized in a List and
+ * can be fired in any order.
+ *
+ * @param numThreads fixed number of threads available in the pool
+ * @param granularity maximum number of events executed serially. The right choice will balance task spawn overhead with parallelism.
+ */
+public class IndependentIterationsThreadPoolStage<T> extends AbstractEStage<List<T>> {
+
+ final private int granularity;
+ private EventHandler<T> handler;
+ private ExecutorService executor;
+
+ public IndependentIterationsThreadPoolStage(EventHandler<T> handler, int numThreads, int granularity) {
+ super(handler.getClass().getName());
+ this.handler = handler;
+ this.executor = Executors.newFixedThreadPool(numThreads);
+ this.granularity = granularity;
+ }
+
+ private Runnable newTask(final List<T> iterations) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ for (T e : iterations) {
+ handler.onNext(e);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void onNext(final List<T> iterations) {
+ Logger.getAnonymousLogger().info("Execute new task [" + iterations.size());
+ final int size = iterations.size();
+ for (int i = 0; i < size; i += granularity) {
+ int toIndex = i + granularity;
+ toIndex = toIndex > size ? size : toIndex;
+ executor.execute(newTask(iterations.subList(i, toIndex)));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ executor.shutdown();
+ executor.awaitTermination(1000, TimeUnit.DAYS);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
new file mode 100644
index 0000000..86269a4
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingEventHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A logging event handler
+ *
+ * @param <T> type
+ */
+public class LoggingEventHandler<T> implements EventHandler<T> {
+ private static final Logger LOG = Logger.getLogger(LoggingEventHandler.class.getName());
+
+ @Inject
+ public LoggingEventHandler() {
+ }
+
+ /**
+ * Logs the event
+ *
+ * @param value an event
+ */
+ @Override
+ public void onNext(T value) {
+ LOG.log(Level.FINE, "{0}", value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
new file mode 100644
index 0000000..b85c37c
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility for logging
+ */
+public class LoggingUtils {
+
+ /**
+ * Sets the logging level
+ *
+ * @param level the logging level
+ */
+ public static void setLoggingLevel(Level level) {
+ Handler[] handlers = Logger.getLogger("").getHandlers();
+ ConsoleHandler ch = null;
+ for (Handler h : handlers) {
+ if (h instanceof ConsoleHandler) {
+ ch = (ConsoleHandler) h;
+ break;
+ }
+ }
+ if (ch == null) {
+ ch = new ConsoleHandler();
+ Logger.getLogger("").addHandler(ch);
+ }
+ ch.setLevel(level);
+ Logger.getLogger("").setLevel(level);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
new file mode 100644
index 0000000..d9f923f
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/LoggingVoidEventHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A logging void event handler
+ */
+public class LoggingVoidEventHandler implements EventHandler<Void> {
+ private static final Logger LOG = Logger.getLogger(LoggingVoidEventHandler.class.getName());
+
+ @Inject
+ public LoggingVoidEventHandler() {
+ }
+
+ /**
+ * Logs the event
+ *
+ * @param value an event
+ */
+ @Override
+ public void onNext(Void value) {
+ LOG.log(Level.FINE, "Logging void event handler");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
new file mode 100644
index 0000000..548ef0d
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler combines two events of different types into a single Pair of events.
+ * Handler will block until both events are received.
+ * <p/>
+ * onNext is thread safe
+ *
+ * @param <L> type of event
+ * @param <R> type of event
+ * @see BlockingEventHandler
+ */
+@Unit
+public final class MergingEventHandler<L, R> {
+
+ private static Logger LOG = Logger.getLogger(MergingEventHandler.class.getName());
+ public final EventHandler<L> left = new Left();
+ public final EventHandler<R> right = new Right();
+ private final Object mutex = new Object();
+ private final EventHandler<Pair<L, R>> destination;
+ private L leftEvent;
+ private R rightEvent;
+
+ @Inject
+ public MergingEventHandler(final EventHandler<Pair<L, R>> destination) {
+ this.destination = destination;
+ reset();
+ }
+
+ /*
+ * Not thread safe. Must be externally synchronized.
+ */
+ private void reset() {
+ rightEvent = null;
+ leftEvent = null;
+ }
+
+ public static class Pair<S1, S2> {
+ public final S1 first;
+ public final S2 second;
+
+ private Pair(S1 s1, S2 s2) {
+ this.first = s1;
+ this.second = s2;
+ }
+ }
+
+ private class Left implements EventHandler<L> {
+
+ @Override
+ public void onNext(final L event) {
+
+ L leftRef = null;
+ R rightRef = null;
+
+ synchronized (mutex) {
+
+ while (leftEvent != null) {
+ try {
+ mutex.wait();
+ } catch (final InterruptedException e) {
+ LOG.log(Level.SEVERE, "Wait interrupted.", e);
+ }
+ }
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "{0} producing left {1}",
+ new Object[]{Thread.currentThread(), event});
+ }
+
+ leftEvent = event;
+ leftRef = event;
+
+ if (rightEvent != null) {
+ rightRef = rightEvent;
+ reset();
+ mutex.notifyAll();
+ }
+ }
+
+ if (rightRef != null) {
+ // I get to fire the event
+ destination.onNext(new Pair<L, R>(leftRef, rightRef));
+ }
+ }
+ }
+
+ private class Right implements EventHandler<R> {
+
+ @Override
+ public void onNext(final R event) {
+
+ L leftRef = null;
+ R rightRef = null;
+
+ synchronized (mutex) {
+
+ while (rightEvent != null) {
+ try {
+ mutex.wait();
+ } catch (final InterruptedException e) {
+ LOG.log(Level.SEVERE, "Wait interrupted.", e);
+ }
+ }
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "{0} producing right {1}",
+ new Object[]{Thread.currentThread(), event});
+ }
+
+ rightEvent = event;
+ rightRef = event;
+
+ if (leftEvent != null) {
+ leftRef = leftEvent;
+ reset();
+ mutex.notifyAll();
+ }
+ }
+
+ if (leftRef != null) {
+ // I get to fire the event
+ destination.onNext(new Pair<L, R>(leftRef, rightRef));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
new file mode 100644
index 0000000..6debd97
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MissingStartHandlerHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.impl;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The EventHandler used as the default for the Clock.StartHandler event.
+ */
+public class MissingStartHandlerHandler implements EventHandler<StartTime> {
+
+ @Inject
+ private MissingStartHandlerHandler() {
+ }
+
+ @Override
+ public void onNext(final StartTime value) {
+ Logger.getLogger(MissingStartHandlerHandler.class.toString())
+ .log(Level.WARNING,
+ "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close.");
+ }
+}