You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:03:45 UTC
[14/51] [abbrv] [partial] Initial merge of Wake,
Tang and REEF into one repository and project
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
deleted file mode 100644
index aec2227..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputFormatLoadingService.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.microsoft.reef.annotations.audience.DriverSide;
-import com.microsoft.reef.driver.context.ActiveContext;
-import com.microsoft.reef.driver.context.ContextConfiguration;
-import com.microsoft.reef.driver.context.ServiceConfiguration;
-import com.microsoft.reef.driver.evaluator.AllocatedEvaluator;
-import com.microsoft.reef.io.data.loading.api.DataLoadingRequestBuilder;
-import com.microsoft.reef.io.data.loading.api.DataLoadingService;
-import com.microsoft.reef.io.data.loading.api.DataSet;
-import com.microsoft.tang.Configuration;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.exceptions.BindException;
-
-/**
- * An implementation of {@link DataLoadingService}
- * that uses the Hadoop {@link InputFormat} to find
- * partitions of data & request resources.
- * <p/>
- * The InputFormat is injected using a Tang external constructor
- * <p/>
- * It also tries to obtain data locality in a greedy
- * fashion using {@link EvaluatorToPartitionMapper}
- */
-@DriverSide
-public class InputFormatLoadingService<K, V> implements DataLoadingService {
-
- private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
-
- private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
-
- private static final String COMPUTE_CONTEXT_PREFIX =
- "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
-
- private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper;
- private final int numberOfPartitions;
-
- private final boolean inMemory;
-
- private final String inputFormatClass;
-
- private final String inputPath;
-
- @Inject
- public InputFormatLoadingService(
- final InputFormat<K, V> inputFormat,
- final JobConf jobConf,
- final @Parameter(DataLoadingRequestBuilder.NumberOfDesiredSplits.class) int numberOfDesiredSplits,
- final @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean inMemory,
- final @Parameter(JobConfExternalConstructor.InputFormatClass.class) String inputFormatClass,
- final @Parameter(JobConfExternalConstructor.InputPath.class) String inputPath) {
-
- this.inMemory = inMemory;
- this.inputFormatClass = inputFormatClass;
- this.inputPath = inputPath;
-
-
- try {
-
- final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits);
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
- }
-
- this.numberOfPartitions = inputSplits.length;
- LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions);
-
- this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits);
-
- } catch (final IOException e) {
- throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
- }
- }
-
- @Override
- public int getNumberOfPartitions() {
- return this.numberOfPartitions;
- }
-
- @Override
- public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
-
- final NumberedSplit<InputSplit> numberedSplit =
- this.evaluatorToPartitionMapper.getInputSplit(
- allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
- allocatedEvaluator.getId());
-
- return ContextConfiguration.CONF
- .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex())
- .build();
- }
-
- @Override
- public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) {
-
- try {
-
- final NumberedSplit<InputSplit> numberedSplit =
- this.evaluatorToPartitionMapper.getInputSplit(
- allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
- allocatedEvaluator.getId());
-
- final Configuration serviceConfiguration = ServiceConfiguration.CONF
- .set(ServiceConfiguration.SERVICES,
- this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
- .build();
-
- return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration)
- .bindImplementation(
- DataSet.class,
- this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
- .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
- .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
- .bindNamedParameter(
- InputSplitExternalConstructor.SerializedInputSplit.class,
- WritableSerializer.serialize(numberedSplit.getEntry()))
- .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class)
- .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
- .build();
-
- } catch (final BindException ex) {
- final String evalId = allocatedEvaluator.getId();
- final String msg = "Unable to create configuration for evaluator " + evalId;
- LOG.log(Level.WARNING, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
-
- @Override
- public String getComputeContextIdPrefix() {
- return COMPUTE_CONTEXT_PREFIX;
- }
-
- @Override
- public boolean isComputeContext(final ActiveContext context) {
- return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
- }
-
- @Override
- public boolean isDataLoadedContext(final ActiveContext context) {
- return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
deleted file mode 100644
index efa3c33..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/InputSplitExternalConstructor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import javax.inject.Inject;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.microsoft.reef.annotations.audience.TaskSide;
-import com.microsoft.tang.ExternalConstructor;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-
-/**
- * A Tang external constructor to inject an InputSplit
- * by deserializing the serialized input split assigned
- * to this evaluator
- */
-@TaskSide
-public class InputSplitExternalConstructor implements ExternalConstructor<InputSplit> {
-
- @NamedParameter
- public static final class SerializedInputSplit implements Name<String> { }
-
- private final InputSplit inputSplit;
-
- @Inject
- public InputSplitExternalConstructor(
- final JobConf jobConf,
- @Parameter(SerializedInputSplit.class) final String serializedInputSplit){
- this.inputSplit = WritableSerializer.deserialize(serializedInputSplit, jobConf);
- }
-
- @Override
- public InputSplit newInstance() {
- return inputSplit;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
deleted file mode 100644
index dab18f5..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/JobConfExternalConstructor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import com.microsoft.tang.ExternalConstructor;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import javax.inject.Inject;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class JobConfExternalConstructor implements ExternalConstructor<JobConf> {
-
- private static final Logger LOG = Logger.getLogger(JobConfExternalConstructor.class.getName());
-
- private final String inputFormatClassName;
- private final String inputPath;
-
- @NamedParameter()
- public static final class InputFormatClass implements Name<String> {
- }
-
- @NamedParameter(default_value = "NULL")
- public static final class InputPath implements Name<String> {
- }
-
- @Inject
- public JobConfExternalConstructor(
- final @Parameter(InputFormatClass.class) String inputFormatClassName,
- final @Parameter(InputPath.class) String inputPath) {
- this.inputFormatClassName = inputFormatClassName;
- this.inputPath = inputPath;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public JobConf newInstance() {
-
- final JobConf jobConf = new JobConf();
-
- try {
-
- final Class<? extends InputFormat> inputFormatClass =
- (Class<? extends InputFormat>) Class.forName(this.inputFormatClassName);
-
- jobConf.setInputFormat(inputFormatClass);
-
- final Method addInputPath =
- inputFormatClass.getMethod("addInputPath", JobConf.class, Path.class);
-
- addInputPath.invoke(inputFormatClass, jobConf, new Path(this.inputPath));
-
- } catch (final ClassNotFoundException ex) {
- throw new RuntimeException("InputFormat: " + this.inputFormatClassName
- + " ClassNotFoundException while creating newInstance of JobConf", ex);
- } catch (final InvocationTargetException | IllegalAccessException ex) {
- throw new RuntimeException("InputFormat: " + this.inputFormatClassName
- + ".addInputPath() method exists, but cannot be called.", ex);
- } catch (final NoSuchMethodException ex) {
- LOG.log(Level.INFO, "{0}.addInputPath() method does not exist", this.inputFormatClassName);
- }
-
- return jobConf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
deleted file mode 100644
index 708729b..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/NumberedSplit.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import org.apache.hadoop.mapred.InputSplit;
-
-/**
- * A tuple of an object of type E and an integer index
- * Used inside {@link EvaluatorToPartitionMapper} to
- * mark the partitions associated with each {@link InputSplit}
- *
- * @param <E>
- */
-final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
- private final E entry;
- private final int index;
-
- public NumberedSplit(final E entry, final int index) {
- super();
- if (entry == null) {
- throw new IllegalArgumentException("Entry cannot be null");
- }
- this.entry = entry;
- this.index = index;
- }
-
- public E getEntry() {
- return entry;
- }
-
- public int getIndex() {
- return index;
- }
-
- @Override
- public String toString() {
- return "InputSplit-" + index;
- }
-
- @Override
- public int compareTo(final NumberedSplit<E> o) {
- if (this.index == o.index)
- return 0;
- if (this.index < o.index)
- return -1;
- else
- return 1;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java b/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
deleted file mode 100644
index c3515de..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/data/loading/impl/WritableSerializer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.data.loading.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.microsoft.reef.io.serialization.Codec;
-
-/**
- * A serializer class that serializes {@link Writable}s
- * into String using the below {@link Codec} that
- * encodes & decodes {@link Writable}s
- * By default this stores the class name in the serialized
- * form so that the specific type can be instantiated on
- * de-serialization. However, this also needs the jobconf
- * to passed in while de-serialization
- */
-public class WritableSerializer {
- public static <E extends Writable> String serialize(E writable){
- final WritableCodec<E> writableCodec = new WritableCodec<>();
- return Base64.encodeBase64String(writableCodec.encode(writable));
- }
-
- public static <E extends Writable> E deserialize(String serializedWritable){
- final WritableCodec<E> writableCodec = new WritableCodec<>();
- return writableCodec.decode(Base64.decodeBase64(serializedWritable));
- }
-
- public static <E extends Writable> E deserialize(String serializedWritable, JobConf jobConf){
- final WritableCodec<E> writableCodec = new WritableCodec<>(jobConf);
- return writableCodec.decode(Base64.decodeBase64(serializedWritable));
- }
-
- static class WritableCodec<E extends Writable> implements Codec<E>{
- private final JobConf jobConf;
-
- public WritableCodec(JobConf jobConf) {
- this.jobConf = jobConf;
- }
-
- public WritableCodec() {
- this.jobConf = new JobConf();
- }
-
- @Override
- public E decode(byte[] bytes) {
- final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- try(DataInputStream dais = new DataInputStream(bais)){
- final String className = dais.readUTF();
- E writable = (E) ReflectionUtils.newInstance(Class.forName(className), jobConf);
- writable.readFields(dais);
- return writable;
- } catch (IOException e) {
- throw new RuntimeException("Could not de-serialize JobConf", e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not instantiate specific writable class", e);
- }
- }
-
- @Override
- public byte[] encode(E writable) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try(final DataOutputStream daos = new DataOutputStream(baos)) {
- daos.writeUTF(writable.getClass().getName());
- writable.write(daos);
- return baos.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize JobConf", e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
deleted file mode 100644
index d395eda..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Cache.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Cache for network and naming services
- */
-public interface Cache<K,V> {
- /**
- * Constructs with timeout
- * key is evicted when it's not used for timeout milli-seconds
- */
-
- /**
- * Returns a value for the key if cached; otherwise creates, caches and returns
- * When it creates a value for a key, only one callable for the key is executed
- *
- * @param key a key
- * @param callable a value fetcher
- * @return a value
- * @throws NetworkException
- */
- public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
-
- /**
- * Invalidates a key from the cache
- * @param key a key
- */
- public void invalidate(K key);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
deleted file mode 100644
index bf51233..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Connection.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-
-/**
- * Connection between two end-points named by identifiers.
- *
- * @param <T> type
- */
-public interface Connection<T> extends AutoCloseable {
-
- /**
- * Opens the connection.
- *
- * @throws NetworkException
- */
- void open() throws NetworkException;
-
- /**
- * Writes an object to the connection.
- *
- * @param obj
- * @throws NetworkException
- */
- void write(T obj) throws NetworkException;
-
- /**
- * Closes the connection.
- *
- * @throws NetworkException
- */
- @Override
- void close() throws NetworkException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
deleted file mode 100644
index 79cd9aa..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/ConnectionFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.Identifier;
-
-/**
- * Factory that creates a new connection
- *
- * @param <T> type
- */
-public interface ConnectionFactory<T> {
-
- /**
- * Creates a new connection
- *
- * @param destId a destination identifier
- * @return a connection
- */
- public Connection<T> newConnection(Identifier destId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java b/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
deleted file mode 100644
index f35c8d7..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/Message.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.Identifier;
-
-/**
- * Network message
- *
- * @param <T>
- */
-public interface Message<T> {
-
- /**
- * Gets a source identifier
- *
- * @return an identifier
- */
- Identifier getSrcId();
-
- /**
- * Gets a destination identifier
- *
- * @return an identifier
- */
- Identifier getDestId();
-
- /**
- * Gets data
- *
- * @return an iterable of data objects
- */
- Iterable<T> getData();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
deleted file mode 100644
index ce1731c..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/TransportFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network;
-
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-
-/**
- * Factory that creates a transport
- */
-public interface TransportFactory {
-
- /**
- * Creates a transport
- *
- * @param port a listening port
- * @param clientHandler a transport client-side handler
- * @param serverHandler a transport server-side handler
- * @param exHandler an exception handler
- * @return
- */
- public Transport create(int port,
- EventHandler<TransportEvent> clientHandler,
- EventHandler<TransportEvent> serverHandler,
- EventHandler<Exception> exHandler);
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
deleted file mode 100644
index 3df006f..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/NetworkRuntimeException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
-
-/**
- * Network service resourcemanager exception
- */
-public class NetworkRuntimeException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- /**
- * Constructs a new network resourcemanager exception with the specified detail message and cause
- *
- * @param s the detailed message
- * @param e the cause
- */
- public NetworkRuntimeException(String s, Throwable e) {
- super(s, e);
- }
-
- /**
- * Constructs a new network resourcemanager exception with the specified detail message
- *
- * @param s the detailed message
- */
- public NetworkRuntimeException(String s) {
- super(s);
- }
-
- /**
- * Constructs a new network resourcemanager exception with the specified cause
- *
- * @param e the cause
- */
- public NetworkRuntimeException(Throwable e) {
- super(e);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
deleted file mode 100644
index 7374497..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/ParentDeadException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2013 Microsoft.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
-
-/**
- * This exception is thrown by a child task
- * when it is told that its Parent died
- */
-public class ParentDeadException extends Exception {
-
- private static final long serialVersionUID = 7636542209271151579L;
-
- public ParentDeadException() {
- }
-
- public ParentDeadException(final String message) {
- super(message);
- }
-
- public ParentDeadException(final Throwable cause) {
- super(cause);
- }
-
- public ParentDeadException(final String message, final Throwable cause) {
- super(message, cause);
- }
-
- public ParentDeadException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java b/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
deleted file mode 100644
index 43a646d..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/exception/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.exception;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
deleted file mode 100644
index 25c8b61..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/BindNSToTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import javax.inject.Inject;
-
-import com.microsoft.reef.task.events.TaskStart;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-
-public class BindNSToTask implements EventHandler<TaskStart>{
-
- private final NetworkService<?> ns;
- private final IdentifierFactory idFac;
-
- @Inject
- public BindNSToTask(
- final NetworkService<?> ns,
- final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
- this.ns = ns;
- this.idFac = idFac;
- }
-
- @Override
- public void onNext(final TaskStart task) {
- this.ns.registerId(this.idFac.getNewInstance(task.getId()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
deleted file mode 100644
index dcc7bb0..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/MessagingTransportFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.impl.SyncStage;
-import com.microsoft.wake.remote.NetUtils;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-import com.microsoft.wake.remote.transport.netty.NettyMessagingTransport;
-
-import javax.inject.Inject;
-
-/**
- * Factory that creates a messaging transport
- */
-public class MessagingTransportFactory implements TransportFactory {
-
- @Inject
- public MessagingTransportFactory() {
- }
-
- /**
- * Creates a transport
- *
- * @param port a listening port
- * @param clientHandler a transport client side handler
- * @param serverHandler a transport server side handler
- * @param exHandler a exception handler
- */
- @Override
- public Transport create(final int port,
- final EventHandler<TransportEvent> clientHandler,
- final EventHandler<TransportEvent> serverHandler,
- final EventHandler<Exception> exHandler) {
-
- final Transport transport = new NettyMessagingTransport(NetUtils.getLocalAddress(),
- port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000);
-
- transport.registerErrorHandler(exHandler);
- return transport;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
deleted file mode 100644
index 2df5995..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSConnection.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.naming.exception.NamingException;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.transport.Link;
-import com.microsoft.wake.remote.transport.LinkListener;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A connection from the network service
- */
-class NSConnection<T> implements Connection<T> {
-
- private static final Logger LOG = Logger.getLogger(NSConnection.class.getName());
-
- private final Identifier srcId;
- private final Identifier destId;
- private final LinkListener<NSMessage<T>> listener;
- private final NetworkService<T> service;
- private final Codec<NSMessage<T>> codec;
-
- // link can change when an endpoint physical address changes
- private Link<NSMessage<T>> link;
-
- /**
- * Constructs a connection
- *
- * @param srcId a source identifier
- * @param destId a destination identifier
- * @param listener a link listener
- * @param service a network service
- */
- NSConnection(final Identifier srcId, final Identifier destId,
- final LinkListener<T> listener, final NetworkService<T> service) {
- this.srcId = srcId;
- this.destId = destId;
- this.listener = new NSMessageLinkListener<>(listener);
- this.service = service;
- this.codec = new NSMessageCodec<>(service.getCodec(), service.getIdentifierFactory());
- }
-
- /**
- * Opens the connection.
- */
- @Override
- public void open() throws NetworkException {
-
- LOG.log(Level.FINE, "looking up {0}", this.destId);
-
- try {
- // naming lookup
- final InetSocketAddress addr = this.service.getNameClient().lookup(this.destId);
- if (addr == null) {
- final NetworkException ex = new NamingException("Cannot resolve " + this.destId);
- LOG.log(Level.WARNING, "Cannot resolve " + this.destId, ex);
- throw ex;
- }
-
- LOG.log(Level.FINE, "Resolved {0} to {1}", new Object[] { this.destId, addr });
-
- // connect to a remote address
- this.link = this.service.getTransport().open(addr, this.codec, this.listener);
- LOG.log(Level.FINE, "Transport returned a link {0}", this.link);
-
- } catch (final Exception ex) {
- LOG.log(Level.WARNING, "Could not open " + this.destId, ex);
- throw new NetworkException(ex);
- }
- }
-
- /**
- * Writes an object to the connection
- *
- * @param obj an object of type T
- * @throws a network exception
- */
- @Override
- public void write(final T obj) throws NetworkException {
- try {
- this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
- } catch (final IOException ex) {
- LOG.log(Level.WARNING, "Could not write to " + this.destId, ex);
- throw new NetworkException(ex);
- }
- }
-
- /**
- * Closes the connection and unregisters it from the service
- */
- @Override
- public void close() throws NetworkException {
- this.service.remove(this.destId);
- }
-}
-
-/**
- * No-op link listener
- *
- * @param <T>
- */
-final class NSMessageLinkListener<T> implements LinkListener<NSMessage<T>> {
-
- NSMessageLinkListener(final LinkListener<T> listener) {
- }
-
- @Override
- public void messageReceived(final NSMessage<T> message) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
deleted file mode 100644
index 63a8dc9..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessage.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.wake.Identifier;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Network service message that implements the Message interface
- *
- * @param <T> type
- */
-public class NSMessage<T> implements Message<T> {
- private final Identifier srcId;
- private final Identifier destId;
- private final List<T> data;
-
- /**
- * Constructs a network service message
- *
- * @param srcId a source identifier
- * @param destId a destination identifier
- * @param data data of type T
- */
- public NSMessage(final Identifier srcId, final Identifier destId, final T data) {
- this.srcId = srcId;
- this.destId = destId;
- this.data = new ArrayList<T>(1);
- this.data.add(data);
- }
-
- /**
- * Constructs a network service message
- *
- * @param srcId a source identifier
- * @param destId a destination identifier
- * @param data a list of data of type T
- */
- public NSMessage(final Identifier srcId, final Identifier destId, final List<T> data) {
- this.srcId = srcId;
- this.destId = destId;
- this.data = data;
- }
-
- /**
- * Gets a source identifier
- *
- * @return an identifier
- */
- public Identifier getSrcId() {
- return srcId;
- }
-
- /**
- * Gets a destination identifier
- *
- * @return an identifier
- */
- public Identifier getDestId() {
- return destId;
- }
-
- /**
- * Gets data
- *
- * @return data
- */
- public List<T> getData() {
- return data;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
deleted file mode 100644
index 1cdd7a7..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NSMessageCodec.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.microsoft.reef.io.network.exception.NetworkRuntimeException;
-import com.microsoft.reef.io.network.proto.ReefNetworkServiceProtos.NSMessagePBuf;
-import com.microsoft.reef.io.network.proto.ReefNetworkServiceProtos.NSRecordPBuf;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.Codec;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Network service message codec
- *
- * @param <T> type
- */
-public class NSMessageCodec<T> implements Codec<NSMessage<T>> {
-
- private final Codec<T> codec;
- private final IdentifierFactory factory;
- private final boolean isStreamingCodec;
-
- /**
- * Constructs a network service message codec
- *
- * @param codec a codec
- * @param factory an identifier factory
- */
- public NSMessageCodec(final Codec<T> codec, final IdentifierFactory factory) {
- this.codec = codec;
- this.factory = factory;
- this.isStreamingCodec = codec instanceof StreamingCodec;
- }
-
- /**
- * Encodes a network service message to bytes
- *
- * @param obj a message
- * @return bytes
- */
- @Override
- public byte[] encode(final NSMessage<T> obj) {
- if(isStreamingCodec) {
- final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
- try(ByteArrayOutputStream baos = new ByteArrayOutputStream()){
- try(DataOutputStream daos = new DataOutputStream(baos)){
- daos.writeUTF(obj.getSrcId().toString());
- daos.writeUTF(obj.getDestId().toString());
- daos.writeInt(obj.getData().size());
- for (final T rec : obj.getData()) {
- streamingCodec.encodeToStream(rec, daos);
- }
- }
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new RuntimeException("IOException", e);
- }
- }
- else {
- final NSMessagePBuf.Builder pbuf = NSMessagePBuf.newBuilder();
- pbuf.setSrcid(obj.getSrcId().toString());
- pbuf.setDestid(obj.getDestId().toString());
- for (final T rec : obj.getData()) {
- final NSRecordPBuf.Builder rbuf = NSRecordPBuf.newBuilder();
- rbuf.setData(ByteString.copyFrom(codec.encode(rec)));
- pbuf.addMsgs(rbuf);
- }
- return pbuf.build().toByteArray();
- }
- }
-
- /**
- * Decodes a network service message from bytes
- *
- * @param buf bytes
- * @return a message
- */
- @Override
- public NSMessage<T> decode(final byte[] buf) {
- if (isStreamingCodec) {
- final StreamingCodec<T> streamingCodec = (StreamingCodec<T>) codec;
- try(ByteArrayInputStream bais = new ByteArrayInputStream(buf)){
- try(DataInputStream dais = new DataInputStream(bais)){
- final Identifier srcId = factory.getNewInstance(dais.readUTF());
- final Identifier destId = factory.getNewInstance(dais.readUTF());
- final int size = dais.readInt();
- final List<T> list = new ArrayList<T>(size);
- for(int i=0;i<size;i++) {
- list.add(streamingCodec.decodeFromStream(dais));
- }
- return new NSMessage<>(srcId, destId, list);
- }
- } catch (final IOException e) {
- throw new RuntimeException("IOException", e);
- }
- } else {
- NSMessagePBuf pbuf;
- try {
- pbuf = NSMessagePBuf.parseFrom(buf);
- } catch (final InvalidProtocolBufferException e) {
- e.printStackTrace();
- throw new NetworkRuntimeException(e);
- }
- final List<T> list = new ArrayList<T>();
- for (final NSRecordPBuf rbuf : pbuf.getMsgsList()) {
- list.add(codec.decode(rbuf.getData().toByteArray()));
- }
- return new NSMessage<T>(factory.getNewInstance(pbuf.getSrcid()), factory.getNewInstance(pbuf.getDestid()), list);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
deleted file mode 100644
index f5804ec..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NameServiceCloseHandler.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.evaluator.context.events.ContextStop;
-import com.microsoft.reef.io.network.naming.NameServer;
-import com.microsoft.wake.EventHandler;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public final class NameServiceCloseHandler implements EventHandler<ContextStop> {
-
- private static final Logger LOG = Logger.getLogger(NameServiceCloseHandler.class.getName());
-
- private final AutoCloseable toClose;
-
- @Inject
- public NameServiceCloseHandler(final NameServer toClose) {
- this.toClose = toClose;
- }
-
- @Override
- public void onNext(final ContextStop event) {
- try {
- LOG.log(Level.FINEST, "Closing {0}", this.toClose);
- this.toClose.close();
- } catch (final Throwable ex) {
- LOG.log(Level.SEVERE, "Exception while closing " + this.toClose, ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
deleted file mode 100644
index c86a7cf..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkService.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.Tuple;
-import com.microsoft.reef.io.naming.Naming;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.ConnectionFactory;
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.reef.io.network.naming.NameCache;
-import com.microsoft.reef.io.network.naming.NameClient;
-import com.microsoft.reef.io.network.naming.NameLookupClient;
-import com.microsoft.reef.io.network.naming.NameServerParameters;
-import com.microsoft.tang.Injector;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.wake.*;
-import com.microsoft.wake.impl.LoggingEventHandler;
-import com.microsoft.wake.impl.SingleThreadStage;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.LinkListener;
-import com.microsoft.wake.remote.transport.Transport;
-
-import javax.inject.Inject;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Network service for Task
- */
-public final class NetworkService<T> implements Stage, ConnectionFactory<T> {
-
- private static final Logger LOG = Logger.getLogger(NetworkService.class.getName());
-
- private static final int retryCount;
- private static final int retryTimeout;
-
- static {
- try {
- final Injector injector = Tang.Factory.getTang().newInjector();
- retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
- retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
- } catch (final InjectionException ex) {
- final String msg = "Exception while trying to find default values for retryCount & Timeout";
- LOG.log(Level.SEVERE, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
-
- private Identifier myId;
- private final IdentifierFactory factory;
- private final Codec<T> codec;
- private final Transport transport;
- private final NameClient nameClient;
-
- private final ConcurrentMap<Identifier, Connection<T>> idToConnMap = new ConcurrentHashMap<>();
-
- private final EStage<Tuple<Identifier, InetSocketAddress>> nameServiceRegisteringStage;
- private final EStage<Identifier> nameServiceUnregisteringStage;
-
- public NetworkService(final IdentifierFactory factory,
- final int nsPort,
- final String nameServerAddr,
- final int nameServerPort,
- final Codec<T> codec,
- final TransportFactory tpFactory,
- final EventHandler<Message<T>> recvHandler,
- final EventHandler<Exception> exHandler) {
- this(factory, nsPort, nameServerAddr, nameServerPort,
- retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler);
- }
-
- @Inject
- public NetworkService(
- final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory,
- final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort,
- final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr,
- final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort,
- final @Parameter(NameLookupClient.RetryCount.class) int retryCount,
- final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout,
- final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec,
- final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory,
- final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler,
- final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) {
-
- this.factory = factory;
- this.codec = codec;
- this.transport = tpFactory.create(nsPort,
- new LoggingEventHandler<TransportEvent>(),
- new MessageHandler<T>(recvHandler, codec, factory), exHandler);
-
- this.nameClient = new NameClient(nameServerAddr, nameServerPort,
- factory, retryCount, retryTimeout, new NameCache(30000));
-
- this.nameServiceRegisteringStage = new SingleThreadStage<>(
- "NameServiceRegisterer", new EventHandler<Tuple<Identifier, InetSocketAddress>>() {
- @Override
- public void onNext(final Tuple<Identifier, InetSocketAddress> tuple) {
- try {
- nameClient.register(tuple.getKey(), tuple.getValue());
- LOG.log(Level.FINEST, "Registered {0} with nameservice", tuple.getKey());
- } catch (final Exception ex) {
- final String msg = "Unable to register " + tuple.getKey() + "with name service";
- LOG.log(Level.WARNING, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
- }, 5);
-
- this.nameServiceUnregisteringStage = new SingleThreadStage<>(
- "NameServiceRegisterer", new EventHandler<Identifier>() {
- @Override
- public void onNext(final Identifier id) {
- try {
- nameClient.unregister(id);
- LOG.log(Level.FINEST, "Unregistered {0} with nameservice", id);
- } catch (final Exception ex) {
- final String msg = "Unable to unregister " + id + " with name service";
- LOG.log(Level.WARNING, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
- }, 5);
- }
-
- public void registerId(final Identifier id) {
- this.myId = id;
- final Tuple<Identifier, InetSocketAddress> tuple =
- new Tuple<>(id, (InetSocketAddress) this.transport.getLocalAddress());
- LOG.log(Level.FINEST, "Binding {0} to NetworkService@({1})",
- new Object[]{tuple.getKey(), tuple.getValue()});
- this.nameServiceRegisteringStage.onNext(tuple);
- }
-
- public void unregisterId(Identifier id) {
- this.myId = null;
- LOG.log(Level.FINEST, "Unbinding {0} to NetworkService@({1})",
- new Object[]{id, this.transport.getLocalAddress()});
- this.nameServiceUnregisteringStage.onNext(id);
- }
-
- public Identifier getMyId() {
- return this.myId;
- }
-
- public Transport getTransport() {
- return this.transport;
- }
-
- public Codec<T> getCodec() {
- return this.codec;
- }
-
- public Naming getNameClient() {
- return this.nameClient;
- }
-
- public IdentifierFactory getIdentifierFactory() {
- return this.factory;
- }
-
- void remove(final Identifier id) {
- this.idToConnMap.remove(id);
- }
-
- @Override
- public void close() throws Exception {
- LOG.log(Level.FINE, "Shutting down");
- this.transport.close();
- this.nameClient.close();
- }
-
- @Override
- public Connection<T> newConnection(final Identifier destId) {
-
- if (this.myId == null) {
- throw new RuntimeException(
- "Trying to establish a connection from a Network Service that is not bound to any task");
- }
-
- final Connection<T> conn = this.idToConnMap.get(destId);
- if (conn != null) {
- return conn;
- }
-
- final Connection<T> newConnection = new NSConnection<T>(
- this.myId, destId, new LinkListener<T>() {
- @Override
- public void messageReceived(final Object message) {
- }
- }, this);
-
- final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection);
- return existing == null ? newConnection : existing;
- }
-}
-
-class MessageHandler<T> implements EventHandler<TransportEvent> {
-
- private final EventHandler<Message<T>> handler;
- private final NSMessageCodec<T> codec;
-
- public MessageHandler(final EventHandler<Message<T>> handler,
- final Codec<T> codec, final IdentifierFactory factory) {
- this.handler = handler;
- this.codec = new NSMessageCodec<T>(codec, factory);
- }
-
- @Override
- public void onNext(final TransportEvent value) {
- final byte[] data = value.getData();
- final NSMessage<T> obj = this.codec.decode(data);
- this.handler.onNext(obj);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
deleted file mode 100644
index 9be5d14..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceClosingHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.evaluator.context.events.ContextStop;
-import com.microsoft.wake.EventHandler;
-
-import javax.inject.Inject;
-
-public class NetworkServiceClosingHandler implements EventHandler<ContextStop> {
- private final NetworkService<?> networkService;
-
- @Inject
- public NetworkServiceClosingHandler(final NetworkService<?> networkService) {
- this.networkService = networkService;
- }
-
- @Override
- public void onNext(ContextStop arg0) {
- try {
- networkService.close();
- } catch (Exception e) {
- throw new RuntimeException("Exception while closing NetworkService", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
deleted file mode 100644
index 69488aa..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/NetworkServiceParameters.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import com.microsoft.reef.io.network.TransportFactory;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.Codec;
-
-public class NetworkServiceParameters {
-
- @NamedParameter
- public static class TaskId implements Name<String> {
-
- }
-
- @NamedParameter(doc = "identifier factory for the service", short_name = "factory", default_class = StringIdentifierFactory.class)
- public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> {
- }
-
- @NamedParameter(doc = "port for the network service", short_name = "nsport", default_value = "7070")
- public static class NetworkServicePort implements Name<Integer> {
- }
-
- @NamedParameter(doc = "codec for the network service", short_name = "nscodec")
- public static class NetworkServiceCodec implements Name<Codec<?>> {
- }
-
- @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", default_class = MessagingTransportFactory.class)
- public static class NetworkServiceTransportFactory implements Name<TransportFactory> {
- }
-
- @NamedParameter(doc = "network receive handler for the network service", short_name = "nshandler")
- public static class NetworkServiceHandler implements Name<EventHandler<?>> {
- }
-
- @NamedParameter(doc = "network exception handler for the network service", short_name = "exhandler")
- public static class NetworkServiceExceptionHandler implements Name<EventHandler<?>> {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
deleted file mode 100644
index 34e9a85..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/StreamingCodec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2013 Microsoft.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import com.microsoft.wake.remote.Codec;
-
-/**
- * A codec that can make serialization more efficient when an object has to be
- * codec'ed through a chain of codecs
- */
-public interface StreamingCodec<T> extends Codec<T> {
-
- void encodeToStream (T obj, DataOutputStream stream);
-
- T decodeFromStream (DataInputStream stream);
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
deleted file mode 100644
index 14265f6..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/UnbindNSFromTask.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
-
-import javax.inject.Inject;
-
-import com.microsoft.reef.task.events.TaskStop;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.IdentifierFactory;
-
-public class UnbindNSFromTask implements EventHandler<TaskStop> {
-
- private final NetworkService<?> ns;
- private final IdentifierFactory idFac;
-
- @Inject
- public UnbindNSFromTask(
- final NetworkService<?> ns,
- final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
- this.ns = ns;
- this.idFac = idFac;
- }
-
- @Override
- public void onNext(final TaskStop task) {
- this.ns.unregisterId(this.idFac.getNewInstance(task.getId()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java b/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
deleted file mode 100644
index 806b25a..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/impl/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.impl;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
deleted file mode 100644
index 34fd260..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameAssignmentTuple.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.microsoft.reef.io.naming.NameAssignment;
-import com.microsoft.wake.Identifier;
-
-import java.net.InetSocketAddress;
-
-/**
- * An implementation of the NameAssignment interface
- */
-public class NameAssignmentTuple implements NameAssignment {
-
- private final Identifier id;
- private final InetSocketAddress addr;
-
- /**
- * Constructs a name assignment tuple
- *
- * @param id an identifier
- * @param addr an Internet socket address
- */
- public NameAssignmentTuple(Identifier id, InetSocketAddress addr) {
- this.id = id;
- this.addr = addr;
- }
-
- /**
- * Gets an identifier
- *
- * @return an identifier
- */
- @Override
- public Identifier getIdentifier() {
- return id;
- }
-
- /**
- * Gets an address
- *
- * @return an Internet socket address
- */
- @Override
- public InetSocketAddress getAddress() {
- return addr;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
deleted file mode 100644
index 2a4fa52..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameCache.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.google.common.cache.CacheBuilder;
-import com.microsoft.reef.io.network.Cache;
-import com.microsoft.wake.Identifier;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Naming cache implementation
- */
-public class NameCache implements Cache<Identifier, InetSocketAddress> {
-
- private final com.google.common.cache.Cache<Identifier, InetSocketAddress> cache;
-
- /**
- * Constructs a naming cache
- *
- * @param timeout a cache entry timeout after access
- */
- public NameCache(long timeout) {
- cache = CacheBuilder.newBuilder()
- .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
- .build();
- }
-
- /**
- * Gets an address for an identifier
- *
- * @param key an identifier
- * @param valueFetcher a callable to load a value for the corresponding identifier
- * @return an Internet socket address
- * @throws ExecutionException
- */
- @Override
- public InetSocketAddress get(Identifier key,
- Callable<InetSocketAddress> valueFetcher) throws ExecutionException {
- return cache.get(key, valueFetcher);
- }
-
- /**
- * Invalidates the entry for an identifier
- *
- * @param key an identifier
- */
- @Override
- public void invalidate(Identifier key) {
- cache.invalidate(key);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java b/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
deleted file mode 100644
index 84af42c..0000000
--- a/reef-io/src/main/java/com/microsoft/reef/io/network/naming/NameClient.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.network.naming;
-
-import com.microsoft.reef.io.naming.Naming;
-import com.microsoft.reef.io.network.Cache;
-import com.microsoft.reef.io.network.naming.exception.NamingRuntimeException;
-import com.microsoft.reef.io.network.naming.serialization.NamingLookupResponse;
-import com.microsoft.reef.io.network.naming.serialization.NamingMessage;
-import com.microsoft.reef.io.network.naming.serialization.NamingRegisterResponse;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.Stage;
-import com.microsoft.wake.impl.SyncStage;
-import com.microsoft.wake.remote.Codec;
-import com.microsoft.wake.remote.NetUtils;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-import com.microsoft.wake.remote.transport.netty.NettyMessagingTransport;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Naming client
- */
-public class NameClient implements Stage, Naming {
- private static final Logger LOG = Logger.getLogger(NameClient.class.getName());
-
- private NameLookupClient lookupClient;
- private NameRegistryClient registryClient;
- private Transport transport;
-
- /**
- * Constructs a naming client
- *
- * @param serverAddr a server address
- * @param serverPort a server port number
- * @param factory an identifier factory
- * @param cache a cache
- */
- public NameClient(String serverAddr, int serverPort,
- IdentifierFactory factory, int retryCount, int retryTimeout,
- Cache<Identifier, InetSocketAddress> cache) {
- this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache);
- }
-
- /**
- * Constructs a naming client
- *
- * @param serverAddr a server address
- * @param serverPort a server port number
- * @param timeout timeout in ms
- * @param factory an identifier factory
- * @param cache a cache
- */
- public NameClient(final String serverAddr, final int serverPort, final long timeout,
- final IdentifierFactory factory, final int retryCount, final int retryTimeout,
- final Cache<Identifier, InetSocketAddress> cache) {
-
- final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>();
- final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>();
- final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
-
- this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), 0,
- new SyncStage<>(new NamingClientEventHandler(
- new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)),
- null, retryCount, retryTimeout);
-
- this.lookupClient = new NameLookupClient(serverAddr, serverPort, timeout,
- factory, retryCount, retryTimeout, replyLookupQueue, this.transport, cache);
-
- this.registryClient = new NameRegistryClient(serverAddr, serverPort, timeout,
- factory, replyRegisterQueue, this.transport);
- }
-
- /**
- * Registers an (identifier, address) mapping
- *
- * @param id an identifier
- * @param addr an Internet socket address
- */
- @Override
- public void register(final Identifier id, final InetSocketAddress addr)
- throws Exception {
- LOG.log(Level.FINE, "Refister {0} : {1}", new Object[] { id, addr });
- this.registryClient.register(id, addr);
- }
-
- /**
- * Unregisters an identifier
- *
- * @param id an identifier
- */
- @Override
- public void unregister(final Identifier id) throws IOException {
- this.registryClient.unregister(id);
- }
-
- /**
- * Finds an address for an identifier
- *
- * @param id an identifier
- * @return an Internet socket address
- */
- @Override
- public InetSocketAddress lookup(final Identifier id) throws Exception {
- return this.lookupClient.lookup(id);
- }
-
- /**
- * Retrieves an address for an identifier remotely
- *
- * @param id an identifier
- * @return an Internet socket address
- * @throws Exception
- */
- public InetSocketAddress remoteLookup(final Identifier id) throws Exception {
- return this.lookupClient.remoteLookup(id);
- }
-
- /**
- * Closes resources
- */
- @Override
- public void close() throws Exception {
-
- if (this.lookupClient != null) {
- this.lookupClient.close();
- }
-
- if (this.registryClient != null) {
- this.registryClient.close();
- }
-
- if (this.transport != null) {
- this.transport.close();
- }
- }
-}
-
-/**
- * Naming client transport event handler
- */
-class NamingClientEventHandler implements EventHandler<TransportEvent> {
-
- private static final Logger LOG = Logger.getLogger(NamingClientEventHandler.class.getName());
-
- private final EventHandler<NamingMessage> handler;
- private final Codec<NamingMessage> codec;
-
- public NamingClientEventHandler(
- final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
- this.handler = handler;
- this.codec = codec;
- }
-
- @Override
- public void onNext(final TransportEvent value) {
- LOG.log(Level.FINE, "Transport: ", value);
- this.handler.onNext(this.codec.decode(value.getData()));
- }
-}
-
-/**
- * Naming response message handler
- */
-class NamingResponseHandler implements EventHandler<NamingMessage> {
-
- private final BlockingQueue<NamingLookupResponse> replyLookupQueue;
- private final BlockingQueue<NamingRegisterResponse> replyRegisterQueue;
-
- NamingResponseHandler(BlockingQueue<NamingLookupResponse> replyLookupQueue,
- BlockingQueue<NamingRegisterResponse> replyRegisterQueue) {
- this.replyLookupQueue = replyLookupQueue;
- this.replyRegisterQueue = replyRegisterQueue;
- }
-
- @Override
- public void onNext(NamingMessage value) {
- if (value instanceof NamingLookupResponse) {
- replyLookupQueue.offer((NamingLookupResponse)value);
- } else if (value instanceof NamingRegisterResponse) {
- replyRegisterQueue.offer((NamingRegisterResponse)value);
- } else {
- throw new NamingRuntimeException("Unknown naming response message");
- }
-
- }
-
-}