You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@reef.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/06 13:55:00 UTC

[jira] [Commented] (REEF-1880) Driver side interfaces: operators and topology

    [ https://issues.apache.org/jira/browse/REEF-1880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503328#comment-16503328 ] 

ASF GitHub Bot commented on REEF-1880:
--------------------------------------

markusweimer closed pull request #1380: [REEF-1880] Driver side interfaces: operators and topology
URL: https://github.com/apache/reef/pull/1380
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Config/OperatorsParameters.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Config/OperatorsParameters.cs
new file mode 100644
index 0000000000..a6dfe2e23d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Config/OperatorsParameters.cs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Elastic.Config.OperatorsParameters
+{
+    [Unstable("0.16", "Parameter may change")]
+    [NamedParameter("Operator Name")]
+    public sealed class OperatorType : Name<string>
+    {
+    }
+
+    [Unstable("0.16", "Parameter may change")]
+    [NamedParameter("Type of the message")]
+    public sealed class MessageType : Name<string>
+    {
+    }
+
+    [Unstable("0.16", "Parameter may change")]
+    [NamedParameter("Operator Id")]
+    public sealed class OperatorId : Name<int>
+    {
+    }
+
+    [Unstable("0.16", "Parameter may change")]
+    [NamedParameter("Number of iterations")]
+    public sealed class NumIterations : Name<int>
+    {
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/DriverMessageType.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/DriverMessageType.cs
new file mode 100644
index 0000000000..f36525c75b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/DriverMessageType.cs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Elastic.Driver
+{
+    /// <summary>
+    /// Possible types of Driver message payloads
+    /// </summary>
+    [Unstable("0.16", "Enum may change")]
+    public enum DriverMessageType : ushort
+    {
+        Failure = 1,
+
+        Ring = 2
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessage.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessage.cs
new file mode 100644
index 0000000000..510c23ef07
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessage.cs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Elastic.Driver
+{
+    /// <summary>
+    /// Message sent by the Driver to operators on running Tasks. 
+    /// This message contains instructions from the Driver to Tasks.
+    /// </summary>
+    [Unstable("0.16", "API may change")]
+    public interface IDriverMessage
+    {
+        /// <summary>
+        /// The destionation task of the message
+        string Destination { get; }
+
+        /// <summary>
+        /// Operator and situation specifyc payload of the message
+        /// </summary>
+        IDriverMessagePayload Message { get; }
+
+        /// <summary>
+        /// Utility method to serialize the message for communication
+        /// </summary>
+        byte[] Serialize();
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessagePayload.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessagePayload.cs
new file mode 100644
index 0000000000..8055a96db4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IDriverMessagePayload.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Elastic.Driver
+{
+    /// <summary>
+    /// Payload of Driver messages.
+    /// </summary>
+    [Unstable("0.16", "API may change")]
+    public interface IDriverMessagePayload
+    {
+        /// <summary>
+        /// The type of payload
+        /// </summary>
+        DriverMessageType MessageType { get; }
+
+        /// <summary>
+        /// Utility method to serialize the payload for communication
+        /// </summary>
+        byte[] Serialize();
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetService.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetService.cs
index dfd87b358b..1164cac8a2 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetService.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Network.Elastic.Failures;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Attributes;
@@ -51,6 +52,13 @@ public interface IElasticTaskSetService : IFailureResponse
         /// <param name="subscriptionName">The name of the subscription</param>
         void RemoveTaskSetSubscription(string subscriptionName);
 
+        /// <summary>
+        /// Get the subscriptions from the context.
+        /// </summary>
+        /// <param name="activeContext">An activeContext</param>
+        /// <returns>The Subscription of the context</returns>
+        string GetContextSubscriptions(IActiveContext activeContext);
+
         /// <summary>
         /// Generate the service configuration object.
         /// This method is used to properly configure the Context with the service.
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetSubscription.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetSubscription.cs
index fbc1c48bd9..472c067dae 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetSubscription.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/IElasticTaskSetSubscription.cs
@@ -19,6 +19,7 @@
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Network.Elastic.Failures;
 using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Network.Elastic.Operators.Logical.Impl;
 
 namespace Org.Apache.REEF.Network.Elastic.Driver
 {
@@ -35,6 +36,11 @@ public interface IElasticTaskSetSubscription : IFailureResponse
         /// </summary>
         string SubscriptionName { get; }
 
+        /// <summary>
+        /// The operator at the beginning of the computation workflow.
+        /// </summary>
+        ElasticOperator RootOperator { get; }
+
         /// <summary>
         /// The Failure State of the target Subscription. 
         /// </summary>
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/ITaskSetManager.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/ITaskSetManager.cs
index 4d4e8063d8..ab0abf2595 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/ITaskSetManager.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Driver/ITaskSetManager.cs
@@ -57,14 +57,14 @@ public interface ITaskSetManager : IFailureResponse, IDisposable
         /// </summary>
         /// <param name="evaluator">The evaluator the context will run on</param>
         /// <returns>A new unique context id</returns>
-        int GetNextTaskContextId(IAllocatedEvaluator evaluator = null);
+        string GetNextTaskContextId(IAllocatedEvaluator evaluator);
 
         /// <summary>
         /// Method used to generate unique task ids.
         /// </summary>
         /// <param name="context">The context the task will run on</param>
         /// <returns>A new task id</returns>
-        int GetNextTaskId(IActiveContext context = null);
+        string GetNextTaskId(IActiveContext context);
 
         /// <summary>
         /// Finalizes the Task Set.
@@ -113,6 +113,12 @@ public interface ITaskSetManager : IFailureResponse, IDisposable
         /// </summary>
         bool Done();
 
+        /// <summary>
+        /// Used to react of a failure of a task.
+        /// </summary>
+        /// <param name="evaluator">The failed task</param>
+        void OnTaskFailure(IFailedTask info);
+
         /// <summary>
         /// Used to react of a failure event occurred on an evaluator.
         /// </summary>
@@ -122,6 +128,7 @@ public interface ITaskSetManager : IFailureResponse, IDisposable
         /// <summary>
         /// Contains the logic to trigger when the execution fails.
         /// </summary>
-        void OnFail();
+        /// <param name="taskId">The id of the task triggering the fail</param>
+        void OnFail(string taskId);
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/CheckpointLevel.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/CheckpointLevel.cs
new file mode 100644
index 0000000000..7535cb28a2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/CheckpointLevel.cs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Elastic.Failures
+{
+    /// <summary>
+    /// Definition of supported checkpointing policies
+    /// </summary>
+    [Unstable("0.16", "Enum may change")]
+    public enum CheckpointLevel : int
+    {
+        None = 0,
+
+        Memory = 1
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureEvent.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureEvent.cs
index 61cc8ba588..2f8c11729e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureEvent.cs
@@ -29,11 +29,20 @@ namespace Org.Apache.REEF.Network.Elastic.Failures
     public interface IFailureEvent
     {
         /// <summary>
-        /// The event / action rised by the transition to a new failure state.
+        /// The event / action rised by the transition to the new failure state.
         /// It is assumed that the result encodes the magnituted of the action, 
         /// e.g., smaller number, less demanding action.
         /// </summary>
-        /// <returns>A value identifing the magnitued of the event</returns>
         int FailureEvent { get; }
+
+        /// <summary>
+        /// The Task id where the failur occurred
+        /// </summary>
+        string TaskId { get; }
+
+        /// <summary>
+        /// The Operator id where the failure occurred
+        /// </summary>
+        int OperatorId { get; }
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureResponse.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureResponse.cs
index b31f118384..aa2e71efea 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureResponse.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureResponse.cs
@@ -16,7 +16,9 @@
 // under the License.
 
 using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Network.Elastic.Driver;
 using Org.Apache.REEF.Utilities.Attributes;
+using System.Collections.Generic;
 
 namespace Org.Apache.REEF.Network.Elastic.Failures
 {
@@ -29,16 +31,21 @@ public interface IFailureResponse
     {
         /// <summary>
         /// Used to react on a failure occurred on a task.
+        /// It gets a failed task as input and in response it produces zero or more failure events
         /// </summary>
-        /// <param name="info">The failed task</param>
-        /// <returns>The failure state after the notification of the failed task</returns>
-        IFailureState OnTaskFailure(IFailedTask task);
+        /// <param name="task">The failed task</param>
+        /// <param name="failureEvents">A list of events encoding the type of action to be triggered</param>
+        /// <returns>Zero or more events for triggering failure mitigation mechanisms</returns>
+        void OnTaskFailure(IFailedTask task, ref List<IFailureEvent> failureEvents);
 
         /// <summary>
         /// When a new failure state is rised, this method is used to dispatch
         /// such event to the proper failure mitigation logic.
+        /// It gets a failure event as input and produces zero or more failure response messages for tasks
         /// </summary>
-        /// <param name="event">Notification specifiying the updated failure state</param>
-        void EventDispatcher(IFailureEvent @event);
+        /// <param name="event">The failure event to react upon</param>
+        /// <param name="failureResponses">A list of messages containing the recovery instructions for the tasks still alive</param>
+        /// <returns>Zero or more messages for the tasks</returns>
+        void EventDispatcher(IFailureEvent @event, ref List<IDriverMessage> failureResponses);
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureStateMachine.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureStateMachine.cs
index 3126f63e24..9eebb50864 100644
--- a/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureStateMachine.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Failures/IFailureStateMachine.cs
@@ -72,7 +72,7 @@ public interface IFailureStateMachine
         /// Remove data point(s) from the Failure Machine as a result of a runtime failure.
         /// </summary>
         /// <param name="points">How many data point to remove</param>
-        /// <returns>The failure state resulting from the removal of the data points</returns>
+        /// <returns>A failure event resulting from the removal of the data points</returns>
         IFailureState RemoveDataPoints(int points);
 
         /// <summary>
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Impl/ElasticOperator.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Impl/ElasticOperator.cs
new file mode 100644
index 0000000000..20833cd1cf
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Operators/Logical/Impl/ElasticOperator.cs
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Network.Elastic.Driver;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Network.Elastic.Failures;
+using Org.Apache.REEF.Utilities.Logging;
+using System.Globalization;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Network.Elastic.Config;
+using Org.Apache.REEF.Tang.Util;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Network.Elastic.Topology.Logical;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Network.Elastic.Config.OperatorsParameters;
+
+namespace Org.Apache.REEF.Network.Elastic.Operators.Logical.Impl
+{
+    /// <summary>
+    /// Basic implementation for logical operators.
+    /// Each operator is part of a subscription and is parametrized by a topology, a failure
+    /// state machine and a checkpoint policy.
+    /// </summary>
+    [Unstable("0.16", "API may change")]
+    public abstract class ElasticOperator : IFailureResponse
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ElasticOperator));
+
+        // For the moment we consider only linear sequences of operators (no branching for e.g., joins)
+        protected ElasticOperator _next = null;
+        protected ElasticOperator _prev = null;
+
+        protected IFailureStateMachine _failureMachine;
+        protected CheckpointLevel _checkpointLevel;
+        protected ITopology _topology;
+
+        protected bool _stateFinalized = false;
+        protected bool _operatorFinalized = false;
+
+        protected IElasticTaskSetSubscription _subscription;
+        protected int _id;
+
+        protected IConfiguration[] _configurations;
+
+        /// <summary>
+        /// Specification for generic Elastic Operators
+        /// </summary>
+        /// <param name="subscription">The subscription this operator is part of</param>
+        /// <param name="prev">The previous operator in the pipeline</param>
+        /// <param name="topology">The topology of the operator</param>
+        /// <param name="failureMachine">The behaviour of the operator under failures</param>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">Additional configuration parameters</param>
+        public ElasticOperator(
+            IElasticTaskSetSubscription subscription,
+            ElasticOperator prev,
+            ITopology topology,
+            IFailureStateMachine failureMachine,
+            CheckpointLevel checkpointLevel = CheckpointLevel.None,
+            params IConfiguration[] configurations)
+        {
+            _subscription = subscription;
+            _prev = prev;
+            _id = Subscription.GetNextOperatorId();
+            _topology = topology;
+            _failureMachine = failureMachine;
+            _checkpointLevel = checkpointLevel;
+            _configurations = configurations;
+        }
+
+        /// <summary>
+        /// The identifier of the master / coordinator node for this operator
+        /// </summary>
+        public int MasterId { get; protected set; }
+
+        /// <summary>
+        /// An operator type specific name
+        /// </summary>
+        public string OperatorName { get; protected set; }
+
+        /// <summary>
+        /// The Subscription this Operator is part of
+        /// </summary>
+        protected IElasticTaskSetSubscription Subscription
+        {
+            get
+            {
+                if (_subscription == null)
+                {
+                    if (_prev == null)
+                    {
+                        throw new IllegalStateException("The reference to the parent subscription is lost");
+                    }
+
+                    return _prev.Subscription;
+                }
+
+                return _subscription;
+            }
+        }
+
+        /// <summary>
+        /// Add a task to the Operator.
+        /// The Operator must have called Build() before adding tasks.
+        /// </summary>
+        /// <param name="taskId">The id of the task to add</param>
+        /// <returns>True if the task is added to the Operator</returns>
+        public virtual bool AddTask(string taskId)
+        {
+            if (_operatorFinalized == false)
+            {
+                throw new IllegalStateException("Operator needs to be built before adding tasks");
+            }
+
+            _topology.AddTask(taskId);
+
+            // The assumption for the moment is that only one data point is added for each task
+            _failureMachine.AddDataPoints(1);
+
+            if (_next != null)
+            {
+                _next.AddTask(taskId);
+            }
+
+            return true;
+        }
+
+        /// <summary>
+        /// Appends the Operators configuration for the input task to the input builder.
+        /// Must be called only after Build() and BuildState() have been called.
+        /// This method shold be called from the root operator at beginning of the pipeline
+        /// </summary>
+        /// <param name="builder">The configuration builder the Operator configuration will be appended to</param>
+        /// <param name="taskId">The task id of the task that belongs to this Operator</param>
+        /// <returns>The configuration for the Task with added Operators information</returns>
+        public void GetTaskConfiguration(ref ICsConfigurationBuilder builder, int taskId)
+        {
+            if (_operatorFinalized && _stateFinalized)
+            {
+                GetOperatorConfiguration(ref builder, taskId);
+
+                if (_next != null)
+                {
+                    _next.GetTaskConfiguration(ref builder, taskId);
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("Operator needs to be built before getting tasks configuration");
+            }
+        }
+
+        /// <summary>
+        /// Finalizes the Operator.
+        /// </summary>
+        /// <returns>The same finalized Operator</returns>
+        public virtual ElasticOperator Build()
+        {
+            if (_operatorFinalized == true)
+            {
+                throw new IllegalStateException("Operator cannot be built more than once");
+            }
+
+            if (_prev != null)
+            {
+                _prev.Build();
+            }
+
+            _operatorFinalized = true;
+
+            return this;
+        }
+
+        /// <summary>
+        /// Finalizes the Operator state. After BuildState, no more tasks can be added
+        /// to the Operator
+        /// </summary>
+        /// <returns>The same Operator with the finalized state</returns>
+        public virtual ElasticOperator BuildState()
+        {
+            if (_stateFinalized == true)
+            {
+                throw new IllegalStateException("Operator cannot be built more than once");
+            }
+
+            if (_operatorFinalized != true)
+            {
+                throw new IllegalStateException("Operator need to be build before finalizing its state");
+            }
+
+            if (_next != null)
+            {
+                _next.BuildState();
+            }
+
+            _failureMachine.Build();
+            _topology.Build();
+
+            LogOperatorState();
+
+            _stateFinalized = true;
+
+            return this;
+        }
+
+        /// <summary>
+        /// Adds the Broadcast Operator to the operator pipeline.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send / receive</typeparam>
+        /// <param name="senderId">The id of the sender node</param>
+        /// <param name="topology">The topology of the operator</param>
+        /// <param name="failureMachine">The failure state machine of the operator</param>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Broadcast operator</returns>
+        public abstract ElasticOperator Broadcast<T>(int senderId, ITopology topology = null, IFailureStateMachine failureMachine = null, CheckpointLevel checkpointLevel = CheckpointLevel.None, params IConfiguration[] configurations);
+
+        /// <summary>
+        /// Adds an instance of the Broadcast Operator to the operator pipeline.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send / receive</typeparam>
+        /// <param name="senderId">The id of the sender node</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Broadcast operator</returns>
+        public ElasticOperator Broadcast<T>(int senderId, params IConfiguration[] configurations)
+        {
+            return Broadcast<T>(senderId, null, _failureMachine.Clone(), CheckpointLevel.None, configurations);
+        }
+
+        /// <summary>
+        /// Adds an instance of the Aggregation Ring Operator to the operator pipeline.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send / receive</typeparam>
+        /// <param name="coordinatorId">The id of the coordinator node starting the ring</param>
+        /// <param name="failureMachine">The failure state machine of the operator</param>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Aggregtion Ring operator</returns>
+        public abstract ElasticOperator AggregationRing<T>(int coordinatorId, IFailureStateMachine failureMachine = null, CheckpointLevel checkpointLevel = CheckpointLevel.None, params IConfiguration[] configurations);
+
+        /// <summary>
+        /// Adds an instance of the Aggregation Ring Operator to the operator pipeline.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send / receive</typeparam>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Aggregtion Ring operator</returns>
+        public ElasticOperator AggregationRing<T>(params IConfiguration[] configurations)
+        {
+            return AggregationRing<T>(MasterId, _failureMachine.Clone(), CheckpointLevel.None, configurations);
+        }
+
+        /// <summary>
+        /// Adds an instance of the Aggregation Ring Operator to the operator pipeline.
+        /// </summary>
+        /// <typeparam name="T">The type of messages that operators will send / receive</typeparam>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Aggregtion Ring operator</returns>
+        public ElasticOperator AggregationRing<T>(CheckpointLevel checkpointLevel, params IConfiguration[] configurations)
+        {
+            return AggregationRing<T>(MasterId, _failureMachine.Clone(), checkpointLevel, configurations);
+        }
+
+        /// <summary>
+        /// Adds an instance of the Enumerable Iterate Operator to the operator pipeline.
+        /// This Operator Iterate a user provided number of times.
+        /// </summary>
+        /// <param name="masterId">The id of the node coordinating the iterations</param>
+        /// <param name="failureMachine">The failure state machine of the operator</param>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Enumerable Iterate operator</returns>
+        public abstract ElasticOperator EnumerableIterate(int masterId, IFailureStateMachine failureMachine = null, CheckpointLevel checkpointLevel = CheckpointLevel.None, params IConfiguration[] configurations);
+
+        /// <summary>
+        /// Adds an instance of the Enumerable Iterate Operator to the operator pipeline.
+        /// This Operator Iterate a user provided number of times.
+        /// </summary>
+        /// <param name="failureMachine">The failure state machine of the operator</param>
+        /// <param name="checkpointLevel">The checkpoint policy for the operator</param>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Enumerable Iterate operator</returns>
+        public ElasticOperator Iterate(IFailureStateMachine failureMachine, CheckpointLevel checkpointLevel, params IConfiguration[] configurations)
+        {
+            return EnumerableIterate(MasterId, failureMachine, checkpointLevel, configurations);
+        }
+
+        /// <summary>
+        /// Adds an instance of the Enumerable Iterate Operator to the operator pipeline.
+        /// This Operator Iterate a user provided number of times.
+        /// </summary>
+        /// <param name="configurations">The configuration of the tasks</param>
+        /// <returns>The same operator pipeline with the added Enumerable Iterate operator</returns>
+        public ElasticOperator Iterate(params IConfiguration[] configurations)
+        {
+            return EnumerableIterate(MasterId, _failureMachine.Clone(), CheckpointLevel.None, configurations);
+        }
+
+        public abstract void OnTaskFailure(IFailedTask task, ref List<IFailureEvent> failureEvents);
+
+        public abstract void EventDispatcher(IFailureEvent @event, ref List<IDriverMessage> failureResponses);
+
+        /// <summary>
+        /// Appends the Operator specific configuration for the input task to the input builder.
+        /// This method is operator specific and serializes the operator configuration into the builder.
+        /// </summary>
+        /// <param name="builder">The configuration builder the Operator configuration will be appended to</param>
+        /// <param name="taskId">The task id of the task that belongs to this Operator</param>
+        /// <returns>The configuration for the Task with added serialized Operator conf</returns>
+        protected virtual void GetOperatorConfiguration(ref ICsConfigurationBuilder builder, int taskId)
+        {
+            ICsConfigurationBuilder operatorBuilder = TangFactory.GetTang().NewConfigurationBuilder();
+
+            _topology.GetTaskConfiguration(ref operatorBuilder, taskId);
+
+            PhysicalOperatorConfiguration(ref operatorBuilder);
+
+            IConfiguration operatorConf = operatorBuilder
+                .BindNamedParameter<OperatorId, int>(
+                    GenericType<OperatorId>.Class,
+                    _id.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            foreach (var conf in _configurations)
+            {
+                operatorConf = Configurations.Merge(operatorConf, conf);
+            }
+
+            Subscription.Service.SerializeOperatorConfiguration(ref builder, operatorConf);
+        }
+
+        /// <summary>
+        /// Binding from logical to physical operator. 
+        /// </summary>
+        /// <param name="builder">The configuration builder the binding will be added to</param>
+        /// <returns>The configuration for the Task with added logical-to-physical binding</returns>
+        protected abstract void PhysicalOperatorConfiguration(ref ICsConfigurationBuilder builder);
+
+        /// <summary>
+        /// Appends the message type to the configuration. 
+        /// </summary>
+        /// <param name="operatorType">The type of the messages the operator is configured to accept</param>
+        /// <param name="builder">The configuration builder the message type will be added to</param>
+        /// <returns>The configuration for the Task with added message type information</returns>
+        protected static void SetMessageType(Type operatorType, ref ICsConfigurationBuilder confBuilder)
+        {
+            if (operatorType.IsGenericType)
+            {
+                var genericTypes = operatorType.GenericTypeArguments;
+                var msgType = genericTypes[0];
+                confBuilder.BindNamedParameter<MessageType, string>(
+                    GenericType<MessageType>.Class, msgType.AssemblyQualifiedName);
+            }
+        }
+
+        /// <summary>
+        /// Logs the current operator state 
+        /// </summary>
+        protected virtual void LogOperatorState()
+        {
+            string intro = string.Format(CultureInfo.InvariantCulture,
+               "State for Operator {0} in Subscription {1}:\n", OperatorName, Subscription.SubscriptionName);
+            string topologyState = string.Format(CultureInfo.InvariantCulture, "Topology:\n{0}", _topology.LogTopologyState());
+            string failureMachineState = "Failure State: " + _failureMachine.State.FailureState +
+                    "\nFailure(s) Reported: " + _failureMachine.NumOfFailedDataPoints;
+
+            LOGGER.Log(Level.Info, intro + topologyState + failureMachineState);
+        }
+
+        /// <summary>
+        /// Returns whether a failure should be propagated to downstream operators or not  
+        /// </summary>
+        /// <returns>True if the failure has to be sent downstream</returns>
+        protected virtual bool PropagateFailureDownstream()
+        {
+            return true;
+        }
+
+        /// <summary>
+        /// Operator specific logic for reacting when a task message is received.
+        /// </summary>
+        /// <param name="message">Incoming message from a task</param>
+        /// <param name="returnMessages">Zero or more reply messages for the task</param>
+        /// <returns>True if the operator has reacted to the task message</returns>
+        protected virtual bool ReactOnTaskMessage(ITaskMessage message, ref List<IDriverMessage> returnMessages)
+        {
+            return false;
+        }
+
+        /// <summary>
+        /// Utility method gathering the set of master task ids of the operators in the current pipeline.
+        /// </summary>
+        /// <param name="masterTasks">The id of the master tasks of the operators preceding operators</param>
+        internal virtual void GatherMasterIds(ref HashSet<string> masterTasks)
+        {
+            if (_operatorFinalized != true)
+            {
+                throw new IllegalStateException("Operator need to be build before finalizing the subscription");
+            }
+
+            masterTasks.Add(Utils.BuildTaskId(Subscription.SubscriptionName, MasterId));
+
+            if (_next != null)
+            {
+                _next.GatherMasterIds(ref masterTasks);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/ITopology.cs
new file mode 100644
index 0000000000..d8539e543a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/ITopology.cs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Network.Elastic.Driver;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Attributes;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Elastic.Topology.Logical
+{
+    /// <summary>
+    /// Represents a topology graph for Elastic Group Communication Operators.
+    /// </summary>
+    [Unstable("0.16", "API may change")]
+    public interface ITopology
+    {
+        /// <summary>
+        /// Adds a new task to the topology.
+        /// When called before Build() actually adds the task to the topology.
+        /// After Build(), it assumes that the task is added because recovered from a failure.
+        /// </summary>
+        /// <param name="taskId">The id of the task to be added</param>
+        /// <returns>The number of data points linked with the added task id</returns>
+        int AddTask(string taskId);
+
+        /// <summary>
+        /// Removes a task from the topology
+        /// </summary>
+        /// <param name="taskId">The id of the task to be removed</param>
+        /// <returns>The number of data points lost because of the removed task id</returns>
+        int RemoveTask(string taskId);
+
+        /// <summary>
+        /// Finalizes the Topology.
+        /// After the Topology has been finalized, any task added to the topology is
+        /// assumed as a task recovered from a failure.
+        /// </summary>
+        /// <returns>The same finalized Subscription</returns>
+        ITopology Build();
+
+        /// <summary>
+        /// Adds the topology configuration for the input task to the input builder.
+        /// Must be called only after all tasks have been added to the Topology, i.e., after build.
+        /// </summary>
+        /// <param name="builder">The configuration builder the configuration will be appended to</param>
+        /// <param name="taskId">The task id of the task that belongs to this Topology</param>
+        void GetTaskConfiguration(ref ICsConfigurationBuilder builder, int taskId);
+
+        /// <summary>
+        /// Utility method for logging the topology state.
+        /// This will be called every time a topology object is built or modified
+        /// because of a failure.
+        /// </summary>
+        string LogTopologyState();
+
+        /// <summary>
+        /// Reconfigure the topologyin response to some event
+        /// </summary>
+        /// <param name="taskId">The task id responsible for the topology change</param>
+        /// <param name="info">Some additional topology-specific information</param>
+        /// <returns>One or more messages for reconfiguring the Tasks</returns>
+        List<IDriverMessage> Reconfigure(string taskId, string info);
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/TopologyType.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/TopologyType.cs
new file mode 100644
index 0000000000..fd15608804
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Topology/Logical/TopologyType.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Network.Elastic.Topology.Logical
+{
+    /// <summary>
+    /// Defines the supported types of topologies 
+    /// </summary>
+    public enum TopologyType
+    {
+        Flat = 0,
+
+        Tree = 1,
+
+        Ring = 2
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Elastic/Utils.cs b/lang/cs/Org.Apache.REEF.Network/Elastic/Utils.cs
new file mode 100644
index 0000000000..a6edefebfd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Elastic/Utils.cs
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Utilities.Attributes;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Network.Elastic
+{
+    /// <summary>
+    /// Utility class.
+    /// </summary>
+    [Unstable("0.16", "Class may change or be removed")]
+    internal static class Utils
+    {
+        /// <summary>
+        /// Builds a task identifier out of a subscription(s) and an id.
+        /// </summary>
+        /// <param name="subscriptionName">The subscriptions active in the task</param>
+        /// <param name="id">The task id</param>
+        /// <returns>The task identifier</returns>
+        public static string BuildTaskId(string subscriptionName, int id)
+        {
+            return BuildIdentifier("Task", subscriptionName, id);
+        }
+
+        /// <summary>
+        /// Utility method returning an identifier by merging the input fields
+        /// </summary>
+        /// <param name="first">The first field</param>
+        /// <param name="second">The second field</param>
+        /// <param name="third">The third field</param>
+        /// <returns>An id merging the three fields</returns>
+        private static string BuildIdentifier(string first, string second, int third)
+        {
+            return string.Format(CultureInfo.InvariantCulture, "{0}-{1}-{2}", first, second, third);
+        }
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 30bc685508..cc045c6124 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -78,6 +78,11 @@ under the License.
     <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
+    <Compile Include="Elastic\Utils.cs" />
+    <Compile Include="Elastic\Config\OperatorsParameters.cs" />
+    <Compile Include="Elastic\Driver\IDriverMessagePayload.cs" />
+    <Compile Include="Elastic\Driver\IDriverMessage.cs" />
+    <Compile Include="Elastic\Driver\DriverMessageType.cs" />
     <Compile Include="Elastic\Driver\ITaskSetManager.cs" />
     <Compile Include="Elastic\Driver\IElasticTaskSetSubscription.cs" />
     <Compile Include="Elastic\Driver\IElasticTaskSetService.cs" />
@@ -85,6 +90,10 @@ under the License.
     <Compile Include="Elastic\Failures\IFailureState.cs" />
     <Compile Include="Elastic\Failures\IFailureEvent.cs" />
     <Compile Include="Elastic\Failures\IFailureResponse.cs" />
+    <Compile Include="Elastic\Failures\CheckpointLevel.cs" />
+    <Compile Include="Elastic\Operators\Logical\Impl\ElasticOperator.cs" />
+    <Compile Include="Elastic\Topology\Logical\TopologyType.cs" />
+    <Compile Include="Elastic\Topology\Logical\ITopology.cs" />
     <Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" />
     <Compile Include="Group\Config\StreamingCodecConfigurationMinusMessage.cs" />
     <Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" />


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Driver side interfaces: operators and topology
> ----------------------------------------------
>
>                 Key: REEF-1880
>                 URL: https://issues.apache.org/jira/browse/REEF-1880
>             Project: REEF
>          Issue Type: Sub-task
>          Components: GroupCommunications
>            Reporter: Matteo Interlandi
>            Assignee: Matteo Interlandi
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)