You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/11/14 07:38:34 UTC
incubator-reef git commit: [REEF-924] Use injectors given from Tang
instead of empty ones in groupcomm evaluator-side
Repository: incubator-reef
Updated Branches:
refs/heads/master 4decc2132 -> 03cef56ba
[REEF-924] Use injectors given from Tang instead of empty ones in groupcomm evaluator-side
This addressed the issue by
* adding constructors that use injectors given by Tang instead of empty injectors
* deprecating the original constructors
* adding a test
JIRA:
[REEF-924](https://issues.apache.org/jira/browse/REEF-924)
Pull Request:
Closes #632
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/03cef56b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/03cef56b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/03cef56b
Branch: refs/heads/master
Commit: 03cef56ba87f5bfa9ede262c07e1c4b71ca70147
Parents: 4decc21
Author: Jason (Joo Seong) Jeong <cu...@gmail.com>
Authored: Tue Nov 10 16:59:33 2015 +0900
Committer: Brian Cho <ch...@apache.org>
Committed: Sat Nov 14 15:03:29 2015 +0900
----------------------------------------------------------------------
.../parameters/DriverIdentifierGroupComm.java | 26 ++++
.../driver/CommunicationGroupDriverImpl.java | 3 +-
.../group/impl/operators/BroadcastReceiver.java | 8 +-
.../group/impl/operators/BroadcastSender.java | 8 +-
.../group/impl/operators/GatherReceiver.java | 8 +-
.../group/impl/operators/GatherSender.java | 8 +-
.../group/impl/operators/ReduceReceiver.java | 3 +-
.../group/impl/operators/ReduceSender.java | 3 +-
.../group/impl/operators/ScatterReceiver.java | 8 +-
.../group/impl/operators/ScatterSender.java | 8 +-
.../impl/task/CommunicationGroupClientImpl.java | 63 ++++++++-
.../group/impl/task/GroupCommClientImpl.java | 33 +++++
.../conf/GroupCommServiceInjectionCodec.java | 55 ++++++++
.../conf/GroupCommServiceInjectionDriver.java | 136 +++++++++++++++++++
.../GroupCommServiceInjectionMasterTask.java | 51 +++++++
.../GroupCommServiceInjectionSlaveTask.java | 57 ++++++++
.../reef/tests/group/conf/package-info.java | 22 +++
.../reef/tests/group/GroupCommTestSuite.java | 4 +-
.../conf/TestGroupCommServiceInjection.java | 78 +++++++++++
.../reef/tests/group/conf/package-info.java | 22 +++
20 files changed, 559 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DriverIdentifierGroupComm.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DriverIdentifierGroupComm.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DriverIdentifierGroupComm.java
new file mode 100644
index 0000000..59fc10a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/parameters/DriverIdentifierGroupComm.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "Identifier of the driver, used in group communication")
+public final class DriverIdentifierGroupComm implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index 74e094c..f5d42c9 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -271,8 +271,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
final String taskId = taskId(taskConf);
if (perTaskState.containsKey(taskId)) {
- jcb.bindNamedParameter(DriverIdentifier.class, driverId);
jcb.bindNamedParameter(CommunicationGroupName.class, groupName.getName());
+ jcb.bindNamedParameter(DriverIdentifierGroupComm.class, driverId);
LOG.finest(getQualifiedName() + "Task has been added. Waiting to acquire configLock");
synchronized (configLock) {
LOG.finest(getQualifiedName() + "Acquired configLock");
@@ -294,7 +294,6 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver {
final Topology topology = topologies.get(operName);
final JavaConfigurationBuilder jcbInner = Tang.Factory.getTang()
.newConfigurationBuilder(topology.getTaskConfiguration(taskId));
- jcbInner.bindNamedParameter(DriverIdentifier.class, driverId);
jcbInner.bindNamedParameter(OperatorName.class, operName.getName());
jcb.bindSetEntry(SerializedOperConfigs.class, confSerializer.toString(jcbInner.build()));
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
index 671b06f..321c79b 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
@@ -18,20 +18,16 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
@@ -69,7 +65,7 @@ public class BroadcastReceiver<T> implements Broadcast.Receiver<T>, EventHandler
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
index 76178de..eb6f7a8 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java
@@ -18,20 +18,16 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
@@ -69,7 +65,7 @@ public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<Gro
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
index 834fb79..d7033e7 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -27,10 +26,7 @@ import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.NetworkService;
@@ -66,7 +62,7 @@ public class GatherReceiver<T> implements Gather.Receiver<T>, EventHandler<Group
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
index a657b07..c8cf8ad 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -27,10 +26,7 @@ import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.impl.NetworkService;
@@ -65,7 +61,7 @@ public class GatherSender<T> implements Gather.Sender<T>, EventHandler<GroupComm
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
index e4ce44e..4a44020 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -70,7 +69,7 @@ public class ReduceReceiver<T> implements Reduce.Receiver<T>, EventHandler<Group
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
@Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
index 7eaaf43..3c45b28 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -72,7 +71,7 @@ public class ReduceSender<T> implements Reduce.Sender<T>, EventHandler<GroupComm
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
@Parameter(ReduceFunctionParam.class) final ReduceFunction<T> reduceFunction,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
index 1c814fb..8d15829 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -27,10 +26,7 @@ import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.ScatterData;
import org.apache.reef.io.network.group.impl.utils.ScatterDecoder;
@@ -66,7 +62,7 @@ public final class ScatterReceiver<T> implements Scatter.Receiver<T>, EventHandl
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
index 617f8ac..367b7de 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.io.network.group.impl.operators;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.exception.ParentDeadException;
@@ -27,10 +26,7 @@ import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
import org.apache.reef.io.network.group.api.task.OperatorTopology;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
-import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
-import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
-import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
-import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
import org.apache.reef.io.network.group.impl.utils.ScatterEncoder;
import org.apache.reef.io.network.group.impl.utils.ScatterHelper;
@@ -68,7 +64,7 @@ public final class ScatterSender<T> implements Scatter.Sender<T>, EventHandler<G
@Parameter(OperatorName.class) final String operName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String selfId,
@Parameter(DataCodec.class) final Codec<T> dataCodec,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
@Parameter(TaskVersion.class) final int version,
final CommGroupNetworkHandler commGroupNetworkHandler,
final NetworkService<GroupCommunicationMessage> netService,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
index 8482268..9bb42bf 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
@@ -18,10 +18,10 @@
*/
package org.apache.reef.io.network.group.impl.task;
-import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.driver.task.TaskConfigurationOptions;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.group.api.operators.*;
+import org.apache.reef.io.network.group.impl.config.parameters.DriverIdentifierGroupComm;
import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
import org.apache.reef.io.network.group.impl.driver.TopologySerializer;
import org.apache.reef.io.network.impl.NetworkService;
@@ -79,10 +79,15 @@ public class CommunicationGroupClientImpl implements CommunicationGroupServiceCl
private final AtomicBoolean init = new AtomicBoolean(false);
+ /**
+ * @deprecated in 0.14.
+ * Use the private constructor that receives an {@code injector} as a parameter instead.
+ */
+ @Deprecated
@Inject
public CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) final String groupName,
@Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
- @Parameter(DriverIdentifier.class) final String driverId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
final GroupCommNetworkHandler groupCommNetworkHandler,
@Parameter(SerializedOperConfigs.class) final Set<String> operatorConfigs,
final ConfigurationSerializer configSerializer,
@@ -135,6 +140,60 @@ public class CommunicationGroupClientImpl implements CommunicationGroupServiceCl
}
}
+ @Inject
+ private CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) final String groupName,
+ @Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
+ @Parameter(DriverIdentifierGroupComm.class) final String driverId,
+ final GroupCommNetworkHandler groupCommNetworkHandler,
+ @Parameter(SerializedOperConfigs.class) final Set<String> operatorConfigs,
+ final ConfigurationSerializer configSerializer,
+ final NetworkService<GroupCommunicationMessage> netService,
+ final CommGroupNetworkHandler commGroupNetworkHandler,
+ final Injector injector) {
+ this.taskId = taskId;
+ this.driverId = driverId;
+ LOG.finest(groupName + " has GroupCommHandler-" + groupCommNetworkHandler.toString());
+ this.identifierFactory = netService.getIdentifierFactory();
+ this.groupName = Utils.getClass(groupName);
+ this.groupCommNetworkHandler = groupCommNetworkHandler;
+ this.commGroupNetworkHandler = commGroupNetworkHandler;
+ this.sender = new Sender(netService);
+ this.operators = new TreeMap<>(new Comparator<Class<? extends Name<String>>>() {
+
+ @Override
+ public int compare(final Class<? extends Name<String>> o1, final Class<? extends Name<String>> o2) {
+ final String s1 = o1.getSimpleName();
+ final String s2 = o2.getSimpleName();
+ return s1.compareTo(s2);
+ }
+ });
+ try {
+ this.groupCommNetworkHandler.register(this.groupName, commGroupNetworkHandler);
+
+ boolean operatorIsScatterSender = false;
+ for (final String operatorConfigStr : operatorConfigs) {
+
+ final Configuration operatorConfig = configSerializer.fromString(operatorConfigStr);
+ final Injector forkedInjector = injector.forkInjector(operatorConfig);
+
+ forkedInjector.bindVolatileInstance(CommunicationGroupServiceClient.class, this);
+
+ final GroupCommOperator operator = forkedInjector.getInstance(GroupCommOperator.class);
+ final String operName = forkedInjector.getNamedInstance(OperatorName.class);
+ this.operators.put(Utils.getClass(operName), operator);
+ LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString());
+
+ if (!operatorIsScatterSender && operator instanceof Scatter.Sender) {
+ LOG.fine(operName + " is a scatter sender. Will keep track of active slave tasks.");
+ operatorIsScatterSender = true;
+ }
+ }
+ this.isScatterSender = operatorIsScatterSender;
+ } catch (final InjectionException | IOException e) {
+ throw new RuntimeException("Unable to deserialize operator config", e);
+ }
+ }
+
@Override
public Broadcast.Sender getBroadcastSender(final Class<? extends Name<String>> operatorName) {
LOG.entering("CommunicationGroupClientImpl", "getBroadcastSender", new Object[]{getQualifiedName(),
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
index 4e635c9..34beda1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
@@ -48,6 +48,13 @@ public class GroupCommClientImpl implements GroupCommClient {
private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups =
new HashMap<>();
+ /**
+ * @deprecated in 0.14.
+ * Use the other constructor that receives an {@code injector} as a parameter instead.
+ * The parameters {@code taskId} and {@code netService} can be removed from the other constructor when
+ * this constructor gets deleted.
+ */
+ @Deprecated
@Inject
public GroupCommClientImpl(
@Parameter(SerializedGroupConfigs.class) final Set<String> groupConfigs,
@@ -78,6 +85,32 @@ public class GroupCommClientImpl implements GroupCommClient {
}
}
+ @Inject
+ private GroupCommClientImpl(@Parameter(SerializedGroupConfigs.class) final Set<String> groupConfigs,
+ @Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
+ final GroupCommNetworkHandler groupCommNetworkHandler,
+ final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> netService,
+ final ConfigurationSerializer configSerializer,
+ final Injector injector) {
+
+ LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler);
+
+ for (final String groupConfigStr : groupConfigs) {
+ try {
+ final Configuration groupConfig = configSerializer.fromString(groupConfigStr);
+ final Injector forkedInjector = injector.forkInjector(groupConfig);
+
+ final CommunicationGroupServiceClient commGroupClient =
+ forkedInjector.getInstance(CommunicationGroupServiceClient.class);
+
+ this.communicationGroups.put(commGroupClient.getName(), commGroupClient);
+
+ } catch (final InjectionException | IOException e) {
+ throw new RuntimeException("Unable to deserialize operator config", e);
+ }
+ }
+ }
+
@Override
public CommunicationGroupClient getCommunicationGroup(
final Class<? extends Name<String>> groupName) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionCodec.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionCodec.java
new file mode 100644
index 0000000..5bd357a
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionCodec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group.conf;
+
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tests.group.conf.GroupCommServiceInjectionDriver.GroupCommServiceInjectionParameter;
+
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+
+/**
+ * Group communication codec used in the GroupCommServiceInjection test.
+ * Adds a certain offset value while decoding data.
+ * The offset should be given via Tang injection.
+ */
+final class GroupCommServiceInjectionCodec implements Codec<Integer> {
+
+ private final int offset;
+
+ @Inject
+ private GroupCommServiceInjectionCodec(@Parameter(GroupCommServiceInjectionParameter.class) final int offset) {
+ this.offset = offset;
+ }
+
+
+ @Override
+ public byte[] encode(final Integer integer) {
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE);
+ byteBuffer.putInt(integer);
+ return byteBuffer.array();
+ }
+
+ @Override
+ public Integer decode(final byte[] data) {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ return byteBuffer.getInt() + offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionDriver.java
new file mode 100644
index 0000000..7c93e5b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionDriver.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group.conf;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
+import org.apache.reef.io.network.group.api.driver.GroupCommDriver;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Driver code for the GroupCommServiceInjection test.
+ * Spawns two evaluators that uses group communication to send and receive a single integer.
+ * To make {@link GroupCommServiceInjectionCodec} instantiate correctly, this binds a Tang configuration that
+ * contains a value for {@link GroupCommServiceInjectionParameter} with the group comm service.
+ */
+@Unit
+final class GroupCommServiceInjectionDriver {
+ static final Integer SEND_INTEGER = 5;
+ static final Integer OFFSET = 10;
+
+ private final EvaluatorRequestor evaluatorRequestor;
+ private final GroupCommDriver groupCommDriver;
+ private final CommunicationGroupDriver commGroupDriver;
+
+ @Inject
+ private GroupCommServiceInjectionDriver(final EvaluatorRequestor evaluatorRequestor,
+ final GroupCommDriver groupCommDriver) {
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.groupCommDriver = groupCommDriver;
+ this.commGroupDriver =
+ groupCommDriver.newCommunicationGroup(GroupCommServiceInjectionGroupName.class, 2);
+
+ this.commGroupDriver
+ .addBroadcast(GroupCommServiceInjectionBroadcast.class,
+ BroadcastOperatorSpec.newBuilder()
+ .setSenderId(GroupCommServiceInjectionMasterTask.TASK_ID)
+ .setDataCodecClass(GroupCommServiceInjectionCodec.class)
+ .build())
+ .finalise();
+ }
+
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(2)
+ .setMemory(128)
+ .build());
+ }
+ }
+
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ allocatedEvaluator.submitContextAndService(
+ groupCommDriver.getContextConfiguration(), groupCommDriver.getServiceConfiguration());
+ }
+ }
+
+ final class ContextActiveHandler implements EventHandler<ActiveContext> {
+ private final AtomicBoolean masterTaskSubmitted = new AtomicBoolean(false);
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ final Configuration paramConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(GroupCommServiceInjectionParameter.class, Integer.toString(OFFSET))
+ .build();
+
+ if (masterTaskSubmitted.compareAndSet(false, true)) {
+ final Configuration masterTaskPartialConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, GroupCommServiceInjectionMasterTask.TASK_ID)
+ .set(TaskConfiguration.TASK, GroupCommServiceInjectionMasterTask.class)
+ .build();
+ commGroupDriver.addTask(masterTaskPartialConf);
+
+ final Configuration masterTaskFinalConf = groupCommDriver.getTaskConfiguration(
+ Configurations.merge(paramConf, masterTaskPartialConf));
+ activeContext.submitTask(masterTaskFinalConf);
+
+ } else {
+ final Configuration slaveTaskPartialConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, GroupCommServiceInjectionSlaveTask.TASK_ID)
+ .set(TaskConfiguration.TASK, GroupCommServiceInjectionSlaveTask.class)
+ .build();
+ commGroupDriver.addTask(slaveTaskPartialConf);
+
+ final Configuration slaveTaskFinalConf = groupCommDriver.getTaskConfiguration(
+ Configurations.merge(paramConf, slaveTaskPartialConf));
+ activeContext.submitTask(slaveTaskFinalConf);
+ }
+ }
+ }
+
+ @NamedParameter(doc = "Named parameter to be used by GroupCommServiceInjectionCodec")
+ final class GroupCommServiceInjectionParameter implements Name<Integer> {
+ }
+
+ @NamedParameter(doc = "Operation name for GroupCommServiceInjection test")
+ final class GroupCommServiceInjectionBroadcast implements Name<String> {
+ }
+
+ @NamedParameter(doc = "GC group name used for GroupCommServiceInjection test")
+ final class GroupCommServiceInjectionGroupName implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionMasterTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionMasterTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionMasterTask.java
new file mode 100644
index 0000000..aea4cf0
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionMasterTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group.conf;
+
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.group.conf.GroupCommServiceInjectionDriver.GroupCommServiceInjectionBroadcast;
+import org.apache.reef.tests.group.conf.GroupCommServiceInjectionDriver.GroupCommServiceInjectionGroupName;
+
+import javax.inject.Inject;
+
+/**
+ * Master task used for the GroupCommServiceInjection test.
+ * Sends a single integer to the {@link GroupCommServiceInjectionSlaveTask}.
+ */
+final class GroupCommServiceInjectionMasterTask implements Task {
+ static final String TASK_ID = "GroupCommServiceInjectionMasterTask";
+
+ private final Broadcast.Sender<Integer> sender;
+
+ @Inject
+ private GroupCommServiceInjectionMasterTask(final GroupCommClient groupCommClient) {
+ final CommunicationGroupClient commGroupClient =
+ groupCommClient.getCommunicationGroup(GroupCommServiceInjectionGroupName.class);
+ this.sender = commGroupClient.getBroadcastSender(GroupCommServiceInjectionBroadcast.class);
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ sender.send(GroupCommServiceInjectionDriver.SEND_INTEGER);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionSlaveTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionSlaveTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionSlaveTask.java
new file mode 100644
index 0000000..8797dcc
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/GroupCommServiceInjectionSlaveTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group.conf;
+
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.group.conf.GroupCommServiceInjectionDriver.GroupCommServiceInjectionBroadcast;
+import org.apache.reef.tests.group.conf.GroupCommServiceInjectionDriver.GroupCommServiceInjectionGroupName;
+
+import javax.inject.Inject;
+
+/**
+ * Slave task used for the GroupCommServiceInjection test.
+ * Receives a single integer from {@link GroupCommServiceInjectionMasterTask} and checks that
+ * {@link GroupCommServiceInjectionCodec} adds the correct offset to the received integer.
+ */
+final class GroupCommServiceInjectionSlaveTask implements Task {
+ static final String TASK_ID = "GroupCommServiceInjectionSlaveTask";
+
+ private final Broadcast.Receiver<Integer> receiver;
+
+ @Inject
+ private GroupCommServiceInjectionSlaveTask(final GroupCommClient groupCommClient) {
+ final CommunicationGroupClient commGroupClient =
+ groupCommClient.getCommunicationGroup(GroupCommServiceInjectionGroupName.class);
+ this.receiver = commGroupClient.getBroadcastReceiver(GroupCommServiceInjectionBroadcast.class);
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ final int expected = GroupCommServiceInjectionDriver.SEND_INTEGER + GroupCommServiceInjectionDriver.OFFSET;
+ final int received = receiver.receive();
+ if (received != expected) {
+ throw new RuntimeException(String.format("Expected %d but received %d", expected, received));
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/package-info.java
new file mode 100644
index 0000000..e17c15c
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/group/conf/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes used in tang configuration related tests for group communication.
+ */
+package org.apache.reef.tests.group.conf;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
index 8e990cb..88e9fc1 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/GroupCommTestSuite.java
@@ -18,12 +18,14 @@
*/
package org.apache.reef.tests.group;
+import org.apache.reef.tests.group.conf.TestGroupCommServiceInjection;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses({
- TestMultipleCommGroups.class
+ TestMultipleCommGroups.class,
+ TestGroupCommServiceInjection.class
})
public final class GroupCommTestSuite {
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/TestGroupCommServiceInjection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/TestGroupCommServiceInjection.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/TestGroupCommServiceInjection.java
new file mode 100644
index 0000000..dbcd84e
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/TestGroupCommServiceInjection.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.group.conf;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.network.group.impl.driver.GroupCommService;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.util.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Launch test to check evaluator-side group comm codecs can receive configuration injections from outside services.
+ */
+public final class TestGroupCommServiceInjection {
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ /**
+ * Set up the test environment.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.testEnvironment.setUp();
+ }
+
+ /**
+ * Tear down the test environment.
+ */
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ /**
+ * Run the GroupCommServiceInjection test.
+ */
+ @Test
+ public void testGroupCommServiceInjection() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES,
+ EnvironmentUtils.getClassLocation(GroupCommServiceInjectionDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER,
+ "TEST_GroupCommServiceInjection")
+ .set(DriverConfiguration.ON_DRIVER_STARTED,
+ GroupCommServiceInjectionDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED,
+ GroupCommServiceInjectionDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE,
+ GroupCommServiceInjectionDriver.ContextActiveHandler.class)
+ .build();
+
+ final Configuration groupCommConf = GroupCommService.getConfiguration();
+ final LauncherStatus state = this.testEnvironment.run(Configurations.merge(driverConf, groupCommConf));
+ Assert.assertTrue("Job state after execution: " + state, state.isSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/03cef56b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/package-info.java
new file mode 100644
index 0000000..3bcdf55
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/group/conf/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tang configuration related tests for group communication.
+ */
+package org.apache.reef.tests.group.conf;