You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/05/24 17:35:26 UTC
incubator-reef git commit: [REEF-277] Remove unused code paths in the
C# Driver
Repository: incubator-reef
Updated Branches:
refs/heads/master 75f25a267 -> 5d5771456
[REEF-277] Remove unused code paths in the C# Driver
This removes traces of a planned, but never executed, full Driver implementation
in C#.
JIRA:
[REEF-277](https://issues.apache.org/jira/browse/REEF-277)
Pull Request:
This closes #192
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/5d577145
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/5d577145
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/5d577145
Branch: refs/heads/master
Commit: 5d5771456bae780b4f0781cf6a2e0d75addc5694
Parents: 75f25a2
Author: Yingda Chen <yd...@gmail.com>
Authored: Sat May 23 10:51:52 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Sun May 24 08:32:42 2015 -0700
----------------------------------------------------------------------
.../Api/IResourceLaunchHandler.cs | 29 -
.../Api/IResourceReleaseHandler.cs | 29 -
.../Api/IResourceRequestHandler.cs | 29 -
.../ClientJobStatusHandler.cs | 141 ----
.../EvaluatorHeartBeatSanityChecker.cs | 56 --
.../Org.Apache.REEF.Common.csproj | 5 -
lang/cs/Org.Apache.REEF.Driver/ClientManager.cs | 44 --
.../Context/EvaluatorContext.cs | 148 -----
lang/cs/Org.Apache.REEF.Driver/DriverManager.cs | 537 ---------------
.../DriverRuntimeConfiguration.cs | 62 --
.../DriverRuntimeConfigurationOptions.cs | 44 --
.../Org.Apache.REEF.Driver/EvaluatorManager.cs | 653 -------------------
.../Org.Apache.REEF.Driver.csproj | 7 -
.../Task/RunningTaskImpl.cs | 114 ----
14 files changed, 1898 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs
deleted file mode 100644
index 53c0b8b..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceLaunchHandler.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-
-namespace Org.Apache.REEF.Common.Api
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public interface IResourceLaunchHandler : IObserver<ResourceLaunchProto>
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs
deleted file mode 100644
index 6712634..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceReleaseHandler.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-
-namespace Org.Apache.REEF.Common.Api
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public interface IResourceReleaseHandler : IObserver<ResourceReleaseProto>
- {
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs b/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs
deleted file mode 100644
index 4668f7a..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Api/IResourceRequestHandler.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-
-namespace Org.Apache.REEF.Common.Api
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public interface IResourceRequestHandler : IObserver<ResourceRequestProto>
- {
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs b/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs
deleted file mode 100644
index 05e4669..0000000
--- a/lang/cs/Org.Apache.REEF.Common/ClientJobStatusHandler.cs
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Time;
-using Org.Apache.REEF.Wake.Time.Event;
-
-namespace Org.Apache.REEF.Common
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class ClientJobStatusHandler : IJobMessageObserver, IObserver<StartTime>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClientJobStatusHandler));
-
- private readonly IClock _clock;
-
- private readonly string _jobId;
-
- private readonly IObserver<JobStatusProto> _jobStatusHandler;
-
- private readonly IDisposable _jobControlChannel;
-
- State _state = State.INIT;
-
- public ClientJobStatusHandler(
- IRemoteManager<IRemoteMessage<REEFMessage>> remoteManager,
- IClock clock,
- IObserver<JobControlProto> jobControlHandler,
- string jobId,
- string clientRID)
- {
- _clock = clock;
- _jobId = jobId;
- _jobStatusHandler = null;
- _jobControlChannel = null;
- //_jobStatusHandler = remoteManager.GetRemoteObserver()
- //_jobControlChannel = remoteManager.RegisterObserver()
- }
-
- public void Dispose(Optional<Exception> e)
- {
- try
- {
- if (e.IsPresent())
- {
- OnError(e.Value);
- }
- else
- {
- JobStatusProto proto = new JobStatusProto();
- proto.identifier = _jobId;
- proto.state = State.DONE;
- Send(proto);
- }
- }
- catch (Exception ex)
- {
- Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Warning, "Error closing ClientJobStatusHandler", LOGGER);
- }
-
- try
- {
- _jobControlChannel.Dispose();
- }
- catch (Exception ex)
- {
- Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Warning, "Error closing jobControlChannel", LOGGER);
- }
- }
-
- public void OnNext(byte[] value)
- {
- LOGGER.Log(Level.Info, "Job message from {0}" + _jobId);
- SendInit();
- JobStatusProto proto = new JobStatusProto();
- proto.identifier = _jobId;
- proto.state = State.RUNNING;
- proto.message = value;
- Send(proto);
- }
-
- public void OnNext(StartTime value)
- {
- LOGGER.Log(Level.Info, "StartTime:" + value);
- SendInit();
- }
-
- public void OnError(Exception error)
- {
- LOGGER.Log(Level.Error, "job excemption", error);
- JobStatusProto proto = new JobStatusProto();
- proto.identifier = _jobId;
- proto.state = State.FAILED;
- proto.exception = ByteUtilities.StringToByteArrays(error.Message);
- _clock.Dispose();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- private void Send(JobStatusProto status)
- {
- LOGGER.Log(Level.Info, "Sending job status " + status);
- _jobStatusHandler.OnNext(status);
- }
-
- private void SendInit()
- {
- if (_state == State.INIT)
- {
- JobStatusProto proto = new JobStatusProto();
- proto.identifier = _jobId;
- proto.state = State.INIT;
- Send(proto);
- _state = State.RUNNING;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs b/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs
deleted file mode 100644
index c8dea3b..0000000
--- a/lang/cs/Org.Apache.REEF.Common/EvaluatorHeartBeatSanityChecker.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Common
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class EvaluatorHeartBeatSanityChecker
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorHeartBeatSanityChecker));
-
- readonly Dictionary<string, long> _timeStamps = new Dictionary<string, long>();
-
- public void check(string id, long timeStamp)
- {
- lock (this)
- {
- if (_timeStamps.ContainsKey(id))
- {
- long oldTimeStamp = _timeStamps[id];
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "TIMESTAMP CHECKER: id [{0}], old timestamp [{1}], new timestamp [{2}]", id, oldTimeStamp, timeStamp));
- if (oldTimeStamp > timeStamp)
- {
- string msg = string.Format(
- CultureInfo.InvariantCulture,
- "Received an old heartbeat with timestamp [{0}] while timestamp [{1}] was received earlier",
- oldTimeStamp,
- timeStamp);
- Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), LOGGER);
- }
- }
- _timeStamps.Add(id, timeStamp);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 7bd929a..63843c4 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -54,9 +54,6 @@ under the License.
<Compile Include="Api\AbstractFailure.cs" />
<Compile Include="Api\IAbstractFailure.cs" />
<Compile Include="Api\IFailure.cs" />
- <Compile Include="Api\IResourceLaunchHandler.cs" />
- <Compile Include="Api\IResourceReleaseHandler.cs" />
- <Compile Include="Api\IResourceRequestHandler.cs" />
<Compile Include="Avro\AvroDriverInfo.cs" />
<Compile Include="Avro\AvroHttpRequest.cs" />
<Compile Include="Avro\AvroHttpSerializer.cs" />
@@ -72,13 +69,11 @@ under the License.
<Compile Include="Catalog\NodeDescriptorImpl.cs" />
<Compile Include="Catalog\RackDescriptorImpl.cs" />
<Compile Include="Catalog\ResourceCatalogImpl.cs" />
- <Compile Include="ClientJobStatusHandler.cs" />
<Compile Include="Constants.cs" />
<Compile Include="Context\ContextMessage.cs" />
<Compile Include="Context\IContextMessage.cs" />
<Compile Include="Context\IContextMessageHandler.cs" />
<Compile Include="Context\IContextMessageSource.cs" />
- <Compile Include="EvaluatorHeartBeatSanityChecker.cs" />
<Compile Include="Evaluator\DefaultLocalHttpDriverConnection.cs" />
<Compile Include="Evaluator\DefaultYarnClusterHttpDriverConnection.cs" />
<Compile Include="Evaluator\DefaultYarnOneBoxHttpDriverConnection.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs b/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs
deleted file mode 100644
index 09f4e21..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/ClientManager.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-
-// TODO
-namespace Org.Apache.REEF.Driver
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class ClientManager : IObserver<JobControlProto>
- {
- public void OnNext(JobControlProto value)
- {
- throw new NotImplementedException();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs b/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs
deleted file mode 100644
index 32009de..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/Context/EvaluatorContext.cs
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Globalization;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-using Org.Apache.REEF.Driver.Bridge.Events;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Driver.Context
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class EvaluatorContext : IActiveContext
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorContext));
-
- private readonly string _identifier;
-
- private readonly Optional<string> _parentId;
-
- private readonly EvaluatorManager _evaluatorManager;
-
- private bool _disposed = false;
-
- public EvaluatorContext(EvaluatorManager evaluatorManager, string id, Optional<string> parentId)
- {
- _identifier = id;
- _parentId = parentId;
- _evaluatorManager = evaluatorManager;
- }
-
- public string Id
- {
- get
- {
- return _identifier;
- }
-
- set
- {
- }
- }
-
- public string EvaluatorId
- {
- get
- {
- return _evaluatorManager.Id;
- }
-
- set
- {
- }
- }
-
- public Optional<string> ParentId
- {
- get
- {
- return _parentId;
- }
-
- set
- {
- }
- }
-
- public IEvaluatorDescriptor EvaluatorDescriptor
- {
- get
- {
- return _evaluatorManager.EvaluatorDescriptor;
- }
-
- set
- {
- }
- }
-
- public void Dispose()
- {
- if (_disposed)
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Active context [{0}] already closed", _identifier));
- Exceptions.Throw(e, LOGGER);
- }
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submit close context: RunningEvaluator id [{0}] for context id [{1}]", EvaluatorId, Id));
- RemoveContextProto removeContextProto = new RemoveContextProto();
- removeContextProto.context_id = Id;
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.remove_context = removeContextProto;
- _evaluatorManager.Handle(contextControlProto);
- _disposed = true;
- }
-
- public ClosedContext GetClosedContext(IActiveContext parentContext)
- {
- //return new ClosedContext(parentContext, EvaluatorId, Id, ParentId, EvaluatorDescriptor);
- throw new NotImplementedException();
- }
-
- public FailedContext GetFailedContext(Optional<IActiveContext> parentContext, Exception cause)
- {
- //return new FailedContext(parentContext, Id, cause, EvaluatorId, ParentId, EvaluatorDescriptor);
- throw new NotImplementedException();
- }
-
- public void SubmitTask(IConfiguration taskConf)
- {
- throw new NotImplementedException();
- }
-
- public void SubmitContext(IConfiguration contextConfiguration)
- {
- throw new NotImplementedException();
- }
-
- public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
- {
- throw new NotImplementedException();
- }
-
- public void SendMessage(byte[] message)
- {
- throw new NotImplementedException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
deleted file mode 100644
index a3b53ed..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
+++ /dev/null
@@ -1,537 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using Org.Apache.REEF.Common;
-using Org.Apache.REEF.Common.Api;
-using Org.Apache.REEF.Common.Catalog;
-using Org.Apache.REEF.Common.Evaluator;
-using Org.Apache.REEF.Common.Exceptions;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Time;
-using Org.Apache.REEF.Wake.Time.Runtime.Event;
-
-namespace Org.Apache.REEF.Driver
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class DriverManager :
- IEvaluatorRequestor,
- IObserver<RuntimeStatusProto>,
- IObserver<ResourceStatusProto>,
- IObserver<ResourceAllocationProto>,
- IObserver<NodeDescriptorProto>,
- IObserver<RuntimeStart>,
- IObserver<RuntimeStop>,
- IObserver<IdleClock>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager));
-
- private readonly IInjector _injector;
-
- private readonly IInjectionFuture<IClock> _clockFuture;
-
- private readonly ResourceCatalogImpl _resourceCatalog;
-
- private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler;
-
- private readonly Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>();
-
- private readonly EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker();
-
- private readonly ClientJobStatusHandler _clientJobStatusHandler;
-
- private readonly IDisposable _heartbeatConnectionChannel;
-
- private readonly IDisposable _errorChannel;
-
- private readonly IObserver<RuntimeErrorProto> _runtimeErrorHandler;
-
- public DriverManager(
- IInjector injector,
- ResourceCatalogImpl resourceCatalog,
- IRemoteManager<REEFMessage> remoteManager,
- IInjectionFuture<IClock> clockFuture,
- IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler,
- ClientJobStatusHandler clientJobStatusHandler,
- string clientRId)
- {
- _injector = injector;
- _clockFuture = clockFuture;
- _resourceCatalog = resourceCatalog;
- _futureResourceRequestHandler = futureResourceRequestHandler;
- _clientJobStatusHandler = clientJobStatusHandler;
-
- _heartbeatConnectionChannel = null;
- _errorChannel = null;
- _runtimeErrorHandler = null;
- LOGGER.Log(Level.Info, "DriverManager instantiated");
- }
-
- public IResourceCatalog ResourceCatalog
- {
- get
- {
- return _resourceCatalog;
- }
-
- set
- {
- }
- }
-
- private RuntimeStatusProto _runtimeStatusProto
- {
- get
- {
- RuntimeStatusProto proto = new RuntimeStatusProto();
- proto.state = State.INIT;
- proto.name = "REEF";
- proto.outstanding_container_requests = 0;
- return proto;
- }
-
- set
- {
- _runtimeStatusProto = value;
- }
- }
-
- public void Submit(IEvaluatorRequest request)
- {
- LOGGER.Log(Level.Info, "Got an EvaluatorRequest");
- ResourceRequestProto proto = new ResourceRequestProto();
- //TODO: request.size deprecated should use megabytes instead
- //switch (request.Size)
- //{
- // case EvaluatorRequest.EvaluatorSize.SMALL:
- // proto.resource_size = SIZE.SMALL;
- // break;
- // case EvaluatorRequest.EvaluatorSize.MEDIUM:
- // proto.resource_size = SIZE.MEDIUM;
- // break;
- // case EvaluatorRequest.EvaluatorSize.LARGE:
- // proto.resource_size = SIZE.LARGE;
- // break;
- // case EvaluatorRequest.EvaluatorSize.XLARGE:
- // proto.resource_size = SIZE.XLARGE;
- // break;
- // default:
- // throw new InvalidOperationException("invalid request size" + request.Size);
- //}
- proto.resource_count = request.Number;
- if (request.MemoryMegaBytes > 0)
- {
- proto.memory_size = request.MemoryMegaBytes;
- }
-
- //final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
- //if (descriptor != null) {
- // if (descriptor instanceof RackDescriptor) {
- // request.addRackName(descriptor.getName());
- // } else if (descriptor instanceof NodeDescriptor) {
- // request.addNodeName(descriptor.getName());
- // }
- //}
-
- //_futureResourceRequestHandler.Get().OnNext(proto);
- }
-
- public void Release(EvaluatorManager evaluatorManager)
- {
- lock (this)
- {
- string evaluatorManagerId = evaluatorManager.Id;
- if (_evaluators.ContainsKey(evaluatorManagerId))
- {
- _evaluators.Remove(evaluatorManagerId);
- }
- else
- {
- var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId);
- Exceptions.Throw(e, LOGGER);
- }
- }
- }
-
- /// <summary>
- /// This handles runtime error occurs on the evaluator
- /// </summary>
- /// <param name="runtimeErrorProto"></param>
- public void Handle(RuntimeErrorProto runtimeErrorProto)
- {
- FailedRuntime error = new FailedRuntime(runtimeErrorProto);
- LOGGER.Log(Level.Warning, "Runtime error:" + error);
-
- EvaluatorException evaluatorException = error.Cause != null
- ? new EvaluatorException(error.Id, error.Cause.Value)
- : new EvaluatorException(error.Id, "Runtime error");
- EvaluatorManager evaluatorManager = null;
- lock (_evaluators)
- {
- if (_evaluators.ContainsKey(error.Id))
- {
- evaluatorManager = _evaluators[error.Id];
- }
- else
- {
- LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause);
- }
- }
- if (null != evaluatorManager)
- {
- evaluatorManager.Handle(evaluatorException);
- }
- }
-
- /// <summary>
- /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status
- /// </summary>
- /// <param name="runtimeStatusProto"></param>
- public void OnNext(RuntimeStatusProto runtimeStatusProto)
- {
- Handle(runtimeStatusProto);
- }
-
- /// <summary>
- /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
- /// about the current state of a given resource. Ideally, we should think the same thing.
- /// </summary>
- /// <param name="resourceStatusProto"></param>
- public void OnNext(ResourceStatusProto resourceStatusProto)
- {
- Handle(resourceStatusProto);
- }
-
- /// <summary>
- /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer.
- /// </summary>
- /// <param name="resourceAllocationProto"></param>
- public void OnNext(ResourceAllocationProto resourceAllocationProto)
- {
- Handle(resourceAllocationProto);
- }
-
- /// <summary>
- /// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog
- /// so that clients can make resource requests against it.
- /// </summary>
- /// <param name="nodeDescriptorProto"></param>
- public void OnNext(NodeDescriptorProto nodeDescriptorProto)
- {
- _resourceCatalog.Handle(nodeDescriptorProto);
- }
-
- /// <summary>
- /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance
- /// point to REEF.
- /// </summary>
- /// <param name="runtimeStart"></param>
- public void OnNext(RuntimeStart runtimeStart)
- {
- LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart);
- _runtimeStatusProto = new RuntimeStatusProto();
- _runtimeStatusProto.state = State.RUNNING;
- _runtimeStatusProto.name = "REEF";
- _runtimeStatusProto.outstanding_container_requests = 0;
- }
-
- /// <summary>
- /// Handles RuntimeStop
- /// </summary>
- /// <param name="runtimeStop"></param>
- public void OnNext(RuntimeStop runtimeStop)
- {
- LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop);
- if (runtimeStop.Exception != null)
- {
- string exceptionMessage = runtimeStop.Exception.Message;
- LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage);
- RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto();
- runtimeErrorProto.message = exceptionMessage;
- runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage);
- runtimeErrorProto.name = "REEF";
- _runtimeErrorHandler.OnNext(runtimeErrorProto);
-
- LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage);
- }
-
- lock (_evaluators)
- {
- foreach (EvaluatorManager evaluatorManager in _evaluators.Values)
- {
- LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id);
- evaluatorManager.Dispose();
- }
- }
-
- try
- {
- _heartbeatConnectionChannel.Dispose();
- _errorChannel.Dispose();
- Optional<Exception> e = runtimeStop.Exception != null ?
- Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty();
- _clientJobStatusHandler.Dispose(e);
-
- LOGGER.Log(Level.Info, "driver manager closed");
- }
- catch (Exception e)
- {
- Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER);
- Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER);
- }
- }
-
- public void OnNext(IdleClock value)
- {
- string message = string.Format(
- CultureInfo.InvariantCulture,
- "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]",
- value + Environment.NewLine,
- _runtimeStatusProto.state + Environment.NewLine,
- _runtimeStatusProto.outstanding_container_requests + Environment.NewLine,
- _runtimeStatusProto.container_allocation.Count);
- LOGGER.Log(Level.Info, message);
-
- lock (_evaluators)
- {
- if (_runtimeStatusProto.state == State.RUNNING
- && _runtimeStatusProto.outstanding_container_requests == 0
- && _runtimeStatusProto.container_allocation.Count == 0)
- {
- LOGGER.Log(Level.Info, "Idle runtime shutdown");
- _clockFuture.Get().Dispose();
- }
- }
- }
-
- void IObserver<IdleClock>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<IdleClock>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStop>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStop>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStart>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStart>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<NodeDescriptorProto>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<NodeDescriptorProto>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<ResourceAllocationProto>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<ResourceAllocationProto>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<ResourceStatusProto>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<ResourceStatusProto>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStatusProto>.OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- void IObserver<RuntimeStatusProto>.OnCompleted()
- {
- throw new NotImplementedException();
- }
-
- /// <summary>
- /// Something went wrong at the runtime layer (either driver or evaluator). This
- /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler.
- /// </summary>
- /// <param name="runtimeErrorProto"></param>
- private void Fail(RuntimeErrorProto runtimeErrorProto)
- {
- _runtimeErrorHandler.OnNext(runtimeErrorProto);
- _clockFuture.Get().Dispose();
- }
-
- /// <summary>
- /// Helper method to create a new EvaluatorManager instance
- /// </summary>
- /// <param name="id">identifier of the Evaluator</param>
- /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param>
- /// <returns>new EvaluatorManager instance.</returns>
- private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor)
- {
- LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id);
- //TODO bindVolatieParameter
- return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager));
- }
-
- /// <summary>
- /// Receives and routes heartbeats from Evaluators.
- /// </summary>
- /// <param name="evaluatorHearBeatProto"></param>
- private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto)
- {
- EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message;
- EvaluatorStatusProto status = heartbeat.evaluator_status;
- string evaluatorId = status.evaluator_id;
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp));
- _sanityChecker.check(evaluatorId, heartbeat.timestamp);
-
- lock (_evaluators)
- {
- if (_evaluators.ContainsKey(evaluatorId))
- {
- EvaluatorManager evaluatorManager = _evaluators[evaluatorId];
- evaluatorManager.Handle(evaluatorHearBeatProto);
- }
- else
- {
- string msg = "Contact from unkonwn evaluator with id: " + evaluatorId;
- if (heartbeat.evaluator_status != null)
- {
- msg += " with state" + status.state;
- }
- LOGGER.Log(Level.Error, msg);
- Exceptions.Throw(new InvalidOperationException(msg), LOGGER);
- }
- }
- }
-
- /// <summary>
- /// This resource status message comes from the ResourceManager layer; telling me what it thinks
- /// about the state of the resource executing an Evaluator; This method simply passes the message
- /// off to the referenced EvaluatorManager
- /// </summary>
- /// <param name="resourceStatusProto"></param>
- private void Handle(ResourceStatusProto resourceStatusProto)
- {
- lock (_evaluators)
- {
- if (_evaluators.ContainsKey(resourceStatusProto.identifier))
- {
- EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier];
- evaluatorManager.Handle(resourceStatusProto);
- }
- else
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state));
- Exceptions.Throw(e, LOGGER);
- }
- }
- }
-
- /// <summary>
- /// This method handles resource allocations by creating a new EvaluatorManager instance.
- /// </summary>
- /// <param name="resourceAllocationProto"></param>
- private void Handle(ResourceAllocationProto resourceAllocationProto)
- {
- lock (_evaluators)
- {
- try
- {
- INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id);
- if (nodeDescriptor == null)
- {
- Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER);
- }
- EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores);
- LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier);
- EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor);
- _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager);
- }
- catch (Exception e)
- {
- Exceptions.Caught(e, Level.Error, LOGGER);
- Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER);
- }
- }
- }
-
- private void Handle(RuntimeStatusProto runtimeStatusProto)
- {
- State runtimeState = runtimeStatusProto.state;
- LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state);
-
- switch (runtimeState)
- {
- case State.FAILED:
- Fail(runtimeStatusProto.error);
- break;
- case State.DONE:
- _clockFuture.Get().Dispose();
- break;
- case State.RUNNING:
- lock (_evaluators)
- {
- _runtimeStatusProto = runtimeStatusProto;
- if (_clockFuture.Get().IsIdle()
- && runtimeStatusProto.outstanding_container_requests == 0
- && runtimeStatusProto.container_allocation.Count == 0)
- {
- _clockFuture.Get().Dispose();
- }
- }
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
deleted file mode 100644
index 9d7f8ad..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common;
-using Org.Apache.REEF.Common.Catalog;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Util;
-
-namespace Org.Apache.REEF.Driver
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class DriverRuntimeConfiguration : ConfigurationModuleBuilder
- {
- public static ConfigurationModule ConfigurationModule
- {
- get
- {
- return new DriverRuntimeConfiguration()
- // Resource Catalog
- .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class)
-
- // JobMessageObserver
- //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class)
- .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class)
-
- // JobMessageObserver Wake event handler bindings
- .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
- .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
-
- // Client manager
- .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class)
-
- // Bind the runtime parameters
- //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class)
- //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class)
- //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class)
- //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class)
-
- // Bind to the Clock
- //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class)
- .Build();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
deleted file mode 100644
index 9ba1820..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Driver
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class DriverRuntimeConfigurationOptions
- {
- [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")]
- public class JobMessageHandler : Name<ClientJobStatusHandler>
- {
- }
-
- [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")]
- public class JobExceptionHandler : Name<ClientJobStatusHandler>
- {
- }
-
- [NamedParameter(documentation: "Called when a job control message is received by the client.")]
- public class JobControlHandler : Name<ClientManager>
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
deleted file mode 100644
index e28373d..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
+++ /dev/null
@@ -1,653 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.Linq;
-using System.Text;
-using Org.Apache.REEF.Common.Api;
-using Org.Apache.REEF.Common.Catalog;
-using Org.Apache.REEF.Common.Evaluator;
-using Org.Apache.REEF.Common.Exceptions;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-using Org.Apache.REEF.Driver.Bridge.Events;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Driver.Task;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Time;
-using TaskMessage = Org.Apache.REEF.Common.Tasks.TaskMessage;
-
-namespace Org.Apache.REEF.Driver
-{
- /// <summary>
- /// Manages a single Evaluator instance including all lifecycle instances:
- /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
- /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
- /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
- /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
- /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
- /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances:
- /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
- /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
- /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
- /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
- /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend).
- /// </summary>
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class EvaluatorManager : IDisposable, IIdentifiable
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager));
-
- private STATE _state = STATE.ALLOCATED;
-
- private IClock _clock;
-
- // TODO
- // private final RemoteManager remoteManager;
- private readonly DriverManager _driverManager;
-
- private readonly IResourceReleaseHandler _resourceReleaseHandler;
-
- private readonly IResourceLaunchHandler _resourceLaunchHandler;
-
- private readonly EvaluatorDescriptorImpl _evaluatorDescriptor;
-
- private readonly string _evaluatorId;
-
- private readonly IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>();
-
- private readonly HashSet<string> _activeContextIds = new HashSet<string>();
-
- private IRunningTask _runningTask = null;
-
- private readonly IObserver<EvaluatorControlProto> _evaluatorControlHandler = null;
-
- private bool _isResourceReleased = false;
-
- //TODO
- //private final DispatchingEStage dispatcher;
- private EvaluatorType _type = EvaluatorType.CLR;
-
- public EvaluatorManager(
- IClock clock,
- //RemoteManager remoteManager,
- DriverManager driverManager,
- IResourceReleaseHandler resourceReleaseHandler,
- IResourceLaunchHandler resourceLaunchHandler,
- //REEFErrorHandler errorHandler,
- string evaluatorId,
- EvaluatorDescriptorImpl evaluatorDescriptor,
- ISet<IObservable<IActiveContext>> activeContextEventHandler,
- ISet<IObservable<IClosedContext>> closedContextEventHandlers,
- ISet<IObservable<FailedContext>> failedContextEventHandlers,
- ISet<IObservable<ContextMessage>> contextMessageHandlers,
- ISet<IObservable<IRunningTask>> runningTaskEventHandlers,
- ISet<IObservable<ICompletedTask>> completedTaskEventHandlers,
- ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers,
- ISet<IObservable<TaskMessage>> taskMessageEventHandlers,
- ISet<IObservable<FailedTask>> taskExceptionEventHandlers,
- ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers,
- ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers,
- ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers)
- {
- _clock = clock;
- //_remoteManager = remoteManager;
- _driverManager = driverManager;
- _resourceReleaseHandler = resourceReleaseHandler;
- _resourceLaunchHandler = resourceLaunchHandler;
- _evaluatorId = evaluatorId;
- _evaluatorDescriptor = evaluatorDescriptor;
-
- //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads
-
- //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers);
- //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers);
- //this.dispatcher.register(FailedContext.class, failedContextEventHandlers);
- //this.dispatcher.register(ContextMessage.class, contextMessageHandlers);
-
- //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers);
- //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers);
- //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers);
- //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers);
- //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers);
-
- //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers);
- //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers);
- //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers);
-
- //this.dispatcher.onNext(AllocatedEvaluator.class,
- // new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier()));
- }
-
- /// <summary>
- /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager.
- /// </summary>
- public enum STATE
- {
- ALLOCATED, // initial state
- SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
- RUNNING, // first contact received, all communication channels established, Evaluator sent to client.
- DONE, // clean shutdown
- FAILED, // some failure occurred.
- KILLED // unclean shutdown
- }
-
- public IEvaluatorDescriptor EvaluatorDescriptor
- {
- get
- {
- return _evaluatorDescriptor;
- }
- }
-
- public INodeDescriptor NodeDescriptor
- {
- get
- {
- return EvaluatorDescriptor.NodeDescriptor;
- }
- }
-
- public IRunningTask RunningTask
- {
- get
- {
- lock (_evaluatorDescriptor)
- {
- return _runningTask;
- }
- }
- }
-
- public string Id
- {
- get
- {
- return _evaluatorId;
- }
-
- set
- {
- }
- }
-
- public EvaluatorType Type
- {
- get
- {
- return _type;
- }
-
- set
- {
- _type = value;
- _evaluatorDescriptor.EvaluatorType = value;
- }
- }
-
- public void Dispose()
- {
- lock (_evaluatorDescriptor)
- {
- if (_state == STATE.RUNNING)
- {
- LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id);
- try
- {
- // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
- EvaluatorControlProto proto = new EvaluatorControlProto();
- proto.timestamp = DateTime.Now.Ticks;
- proto.identifier = Id;
- proto.kill_evaluator = new KillEvaluatorProto();
- Handle(proto);
- }
- finally
- {
- _state = STATE.KILLED;
- }
- }
- }
-
- if (!_isResourceReleased)
- {
- try
- {
- // We need to wait awhile before returning the container to the RM in order to
- // give the EvaluatorRuntime (and Launcher) time to cleanly exit.
-
- // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
- //@Override
- //public void onNext(final Alarm alarm) {
- // EvaluatorManager.this.resourceReleaseHandler.onNext(
- // DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
- // .setIdentifier(EvaluatorManager.this.evaluatorId).build());
- }
- catch (Exception e)
- {
- Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER);
- ResourceReleaseProto proto = new ResourceReleaseProto();
- proto.identifier = _evaluatorId;
- _resourceReleaseHandler.OnNext(proto);
- }
- finally
- {
- _isResourceReleased = true;
- _driverManager.Release(this);
- }
- }
- }
-
- /// <summary>
- /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
- /// </summary>
- /// <param name="exception"></param>
- public void Handle(EvaluatorException exception)
- {
- lock (_evaluatorDescriptor)
- {
- if (_state >= STATE.DONE)
- {
- return;
- }
- LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message);
- try
- {
- IList<FailedContext> failedContexts = new List<FailedContext>();
- IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts);
- activeContexts = activeContexts.Reverse().ToList();
- foreach (EvaluatorContext context in activeContexts)
- {
- Optional<IActiveContext> parentContext = context.ParentId.IsPresent()
- ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value))
- : Optional<IActiveContext>.Empty();
- failedContexts.Add(context.GetFailedContext(parentContext, exception));
- }
-
- //Optional<FailedTask> failedTask = _runningTask != null ?
- // Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty();
- //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
- //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl(
- //exception, failedContextList, failedTaskOptional, this.evaluatorId));
- }
- catch (Exception e)
- {
- Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER);
- }
- finally
- {
- _state = STATE.FAILED;
- Dispose();
- }
- }
- }
-
- public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage)
- {
- lock (_evaluatorDescriptor)
- {
- EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message;
- if (heartbeatProto.evaluator_status != null)
- {
- EvaluatorStatusProto status = heartbeatProto.evaluator_status;
- if (status.error != null)
- {
- Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error)));
- return;
- }
- else if (_state == STATE.SUBMITTED)
- {
- string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString();
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId);
- // TODO
- // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class);
- _state = STATE.RUNNING;
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId));
- }
- }
-
- LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto);
-
- EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status;
- foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status)
- {
- Handle(contextStatusProto, heartbeatProto.task_status != null);
- }
-
- if (heartbeatProto.task_status != null)
- {
- Handle(heartbeatProto.task_status);
- }
-
- if (evaluatorStatusProto.state == State.FAILED)
- {
- _state = STATE.FAILED;
- EvaluatorException e = evaluatorStatusProto.error != null ?
- new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) :
- new EvaluatorException(_evaluatorId, "unknown cause");
- LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message);
- Handle(e);
- }
- else if (evaluatorStatusProto.state == State.DONE)
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id));
- _state = STATE.DONE;
-
- // TODO
- // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() {
- //@Override
- //public String getId() {
- // return EvaluatorManager.this.evaluatorId;
- Dispose();
- }
- }
- LOGGER.Log(Level.Info, "DONE with evaluator heartbeat");
- }
-
- public void Handle(ResourceLaunchProto resourceLaunchProto)
- {
- lock (_evaluatorDescriptor)
- {
- if (_state == STATE.ALLOCATED)
- {
- _state = STATE.SUBMITTED;
- _resourceLaunchHandler.OnNext(resourceLaunchProto);
- }
- else
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state));
- Exceptions.Throw(e, LOGGER);
- }
- }
- }
-
- /// <summary>
- /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
- /// </summary>
- /// <param name="contextControlProto"></param>
- public void Handle(ContextControlProto contextControlProto)
- {
- lock (_evaluatorDescriptor)
- {
- LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId);
- EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto();
- evaluatorControlProto.timestamp = DateTime.Now.Ticks;
- evaluatorControlProto.identifier = Id;
- evaluatorControlProto.context_control = contextControlProto;
-
- Handle(evaluatorControlProto);
- }
- }
-
- /// <summary>
- /// Forward the EvaluatorControlProto to the EvaluatorRuntime
- /// </summary>
- /// <param name="proto"></param>
- public void Handle(EvaluatorControlProto proto)
- {
- lock (_evaluatorDescriptor)
- {
- if (_state == STATE.RUNNING)
- {
- _evaluatorControlHandler.OnNext(proto);
- }
- else
- {
- var e = new InvalidOperationException(
- string.Format(
- CultureInfo.InvariantCulture,
- "Evaluator manager expects to be in {0} state, but instead is in state {1}",
- STATE.RUNNING,
- _state));
- Exceptions.Throw(e, LOGGER);
- }
- }
- }
-
- /// <summary>
- /// Resource status information from the (actual) resource manager.
- /// </summary>
- /// <param name="resourceStatusProto"></param>
- public void Handle(ResourceStatusProto resourceStatusProto)
- {
- lock (_evaluatorDescriptor)
- {
- State resourceState = resourceStatusProto.state;
- LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState);
-
- if (resourceState == State.DONE || resourceState == State.FAILED)
- {
- if (_state < STATE.DONE)
- {
- // something is wrong, I think I'm alive but the resource manager runtime says I'm dead
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.Append(
- string.Format(
- CultureInfo.InvariantCulture,
- "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state",
- _evaluatorId,
- resourceState,
- _state));
- if (resourceStatusProto.diagnostics != null)
- {
- stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics);
- }
- if (_runningTask != null)
- {
- stringBuilder.Append(
- string.Format(
- CultureInfo.InvariantCulture,
- "Taskruntime {0} did not complete before this evaluator died.",
- _runningTask.Id));
- }
-
- // RM is telling me its DONE/FAILED - assuming it has already released the resources
- _isResourceReleased = true;
- //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask));
- _state = STATE.KILLED;
- }
- }
- }
- }
-
- /// <summary>
- /// Handle a context status update
- /// </summary>
- /// <param name="contextStatusProto"></param>
- /// <param name="notifyClientOnNewActiveContext"></param>
- private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext)
- {
- string contextId = contextStatusProto.context_id;
- Optional<string> parentId = contextStatusProto.parent_id != null ?
- Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty();
- if (ContextStatusProto.State.READY == contextStatusProto.context_state)
- {
- if (!_activeContextIds.Contains(contextId))
- {
- EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId);
- AddEvaluatorContext(evaluatorContext);
- if (notifyClientOnNewActiveContext)
- {
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString());
- //TODO
- //dispatcher.onNext(ActiveContext.class, context);
- }
- }
- foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message)
- {
- byte[] message = contextMessageProto.message;
- string sourceId = contextMessageProto.source_id;
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message);
- // this.dispatcher.onNext(ContextMessage.class,
- //new ContextMessageImpl(theMessage, contextID, sourceID));
- }
- }
- else
- {
- if (!_activeContextIds.Contains(contextId))
- {
- if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
- {
- AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId));
- }
- else
- {
- var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state);
- Exceptions.Throw(e, LOGGER);
- }
- }
- }
-
- EvaluatorContext context = GetEvaluatorContext(contextId);
- EvaluatorContext parentContext = context.ParentId.IsPresent() ?
- GetEvaluatorContext(context.ParentId.Value) : null;
- RemoveEvaluatorContext(context);
-
- if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
- {
- // TODO
- Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error));
- Optional<IActiveContext> optionalParentContext = (null == parentContext) ?
- Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext);
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext);
- // TODO
- //this.dispatcher.onNext(FailedContext.class,
- //context.getFailedContext(optionalParentContext, reason));
- }
- else if (ContextStatusProto.State.DONE == contextStatusProto.context_state)
- {
- if (null != parentContext)
- {
- // TODO
- //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext));
- }
- else
- {
- LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown.");
- }
- }
- else
- {
- var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId));
- Exceptions.Throw(e, LOGGER);
- }
- }
-
- /// <summary>
- /// Handle task status messages.
- /// </summary>
- /// <param name="taskStatusProto"></param>
- private void Handle(TaskStatusProto taskStatusProto)
- {
- LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state));
- string taskId = taskStatusProto.task_id;
- string contextId = taskStatusProto.context_id;
- State taskState = taskStatusProto.state;
-
- if (taskState == State.INIT)
- {
- EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
- _runningTask = new RunningTaskImpl(taskId, evaluatorContext);
- // this.dispatcher.onNext(RunningTask.class, this.runningTask);
- }
- else if (taskState == State.SUSPEND)
- {
- EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
- _runningTask = null;
- byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
- //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId));
- }
- else if (taskState == State.DONE)
- {
- EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
- _runningTask = null;
- byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
- //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId));
- }
- else if (taskState == State.FAILED)
- {
- _runningTask = null;
- //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
- //FailedTask failedTask = taskStatusProto.result != null ?
- // new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) :
- // new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext));
- //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
- //this.dispatcher.onNext(FailedTask.class, taskException);
- }
- else if (taskStatusProto.task_message.Count > 0)
- {
- if (_runningTask != null)
- {
- var e = new InvalidOperationException("runningTask must be null when there are multiple task messages");
- Exceptions.Throw(e, LOGGER);
- }
- foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message)
- {
- LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString());
- // this.dispatcher.onNext(TaskMessage.class,
- //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
- // taskId, contextId, taskMessageProto.getSourceId()));
- }
- }
- }
-
- private EvaluatorContext GetEvaluatorContext(string id)
- {
- foreach (EvaluatorContext context in _activeContexts)
- {
- if (context.Id.Equals(id))
- {
- return context;
- }
- var e = new InvalidOperationException("Unknown evaluator context with id " + id);
- Exceptions.Throw(e, LOGGER);
- }
- return null;
- }
-
- private void AddEvaluatorContext(EvaluatorContext context)
- {
- _activeContexts.Add(context);
- _activeContextIds.Add(context.Id);
- }
-
- private void RemoveEvaluatorContext(EvaluatorContext context)
- {
- _activeContexts.Remove(context);
- _activeContextIds.Remove(context.Id);
- }
-
- [NamedParameter(documentation: "The Evaluator Identifier.")]
- public class EvaluatorIdentifier : Name<string>
- {
- }
-
- [NamedParameter(documentation: "The Evaluator Host.")]
- public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl>
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 74fbf5f..692f39d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -84,14 +84,12 @@ under the License.
<Compile Include="Bridge\ILogger.cs" />
<Compile Include="Bridge\ReefHttpRequest.cs" />
<Compile Include="Bridge\ReefHttpResponse.cs" />
- <Compile Include="ClientManager.cs" />
<Compile Include="Constants.cs" />
<Compile Include="Context\ContextConfiguration.cs" />
<Compile Include="Context\ContextConfigurationOptions.cs" />
<Compile Include="Context\Defaults\DefaultContextMessageSource.cs" />
<Compile Include="Context\Defaults\DefaultContextStartHandler.cs" />
<Compile Include="Context\Defaults\DefaultContextStopHandler.cs" />
- <Compile Include="Context\EvaluatorContext.cs" />
<Compile Include="Context\IActiveContext.cs" />
<Compile Include="Context\IClosedContext.cs" />
<Compile Include="Context\IContext.cs" />
@@ -120,11 +118,7 @@ under the License.
<Compile Include="Defaults\DefaultTaskSuspensionHandler.cs" />
<Compile Include="DriverConfigGenerator.cs" />
<Compile Include="DriverConfigurationSettings.cs" />
- <Compile Include="DriverManager.cs" />
- <Compile Include="DriverRuntimeConfiguration.cs" />
- <Compile Include="DriverRuntimeConfigurationOptions.cs" />
<Compile Include="DriverSubmissionSettings.cs" />
- <Compile Include="EvaluatorManager.cs" />
<Compile Include="Evaluator\EvaluatorDescriptorImpl.cs" />
<Compile Include="Evaluator\EvaluatorRequest.cs" />
<Compile Include="Evaluator\EvaluatorRequestBuilder.cs" />
@@ -143,7 +137,6 @@ under the License.
<Compile Include="Task\IRunningTask.cs" />
<Compile Include="Task\ISuspendedTask.cs" />
<Compile Include="Task\ITaskMessage.cs" />
- <Compile Include="Task\RunningTaskImpl.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Org.Apache.REEF.Driver.nuspec" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5d577145/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs b/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs
deleted file mode 100644
index 056c188..0000000
--- a/lang/cs/Org.Apache.REEF.Driver/Task/RunningTaskImpl.cs
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Globalization;
-using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
-using Org.Apache.REEF.Driver.Context;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Driver.Task
-{
- [Obsolete("Driver core logic no longer needed in.NET")]
- public class RunningTaskImpl : IRunningTask
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(RunningTaskImpl));
-
- private readonly string _id;
-
- private readonly EvaluatorContext _evaluatorContext;
-
- public RunningTaskImpl(string taskId, EvaluatorContext evaluatorContext)
- {
- _id = taskId;
- _evaluatorContext = evaluatorContext;
- }
-
- public string Id
- {
- get
- {
- return _id;
- }
-
- set
- {
- }
- }
-
- public IActiveContext ActiveContext
- {
- get
- {
- return _evaluatorContext;
- }
-
- set
- {
- }
- }
-
- public void Dispose()
- {
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.stop_task = new StopTaskProto();
- }
-
- public void Dispose(byte[] message)
- {
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.stop_task = new StopTaskProto();
- contextControlProto.task_message = message;
- }
-
- public void OnNext(byte[] message)
- {
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.task_message = message;
- }
-
- public void Suspend(byte[] message)
- {
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.suspend_task = new SuspendTaskProto();
- contextControlProto.task_message = message;
- }
-
- public void Suspend()
- {
- ContextControlProto contextControlProto = new ContextControlProto();
- contextControlProto.suspend_task = new SuspendTaskProto();
- }
-
- public override string ToString()
- {
- return "TaskRuntime with taskId = " + _id;
- }
-
- public override int GetHashCode()
- {
- return _id.GetHashCode();
- }
-
- public void Send(byte[] message)
- {
- LOGGER.Log(Level.Info, "RunningTaskImpl.Send() is called");
- }
- }
-}