You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:58 UTC
[25/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
new file mode 100644
index 0000000..fd2e80f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/CodecRamMap.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.storage.util.GetAllIterable;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class CodecRamMap<T> implements ExternalMap<T> {
+
+ private final Codec<T> c;
+ private final ConcurrentSkipListMap<CharSequence, byte[]> map;
+
+ @Inject
+ public CodecRamMap(RamStorageService ramStore,
+ @Parameter(RamMapCodec.class) final Codec<T> c) {
+ this.c = c;
+ this.map = new ConcurrentSkipListMap<CharSequence, byte[]>();
+ }
+
+ @Override
+ public boolean containsKey(final CharSequence key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public T get(final CharSequence key) {
+ final byte[] ret = map.get(key);
+ return ret != null ? c.decode(ret) : null;
+ }
+
+ @Override
+ public T put(final CharSequence key, T value) {
+ final byte[] ret = map.put(key, c.encode(value));
+ return ret != null ? c.decode(ret) : null;
+ }
+
+ @Override
+ public T remove(final CharSequence key) {
+ final byte[] ret = map.remove(key);
+ return ret != null ? c.decode(ret) : null;
+ }
+
+ @Override
+ public void putAll(final Map<? extends CharSequence, ? extends T> m) {
+ for (final CharSequence x : m.keySet()) {
+ map.put(x, c.encode(m.get(x)));
+ }
+ }
+
+ @Override
+ public Iterable<Map.Entry<CharSequence, T>> getAll(
+ final Set<? extends CharSequence> keys) {
+ return new GetAllIterable<T>(keys, this);
+ }
+
+ @NamedParameter
+ static public class RamMapCodec implements Name<Codec<?>> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
new file mode 100644
index 0000000..35a20e1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.storage.util.GetAllIterable;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Simple in-memory ExternalMap implementation. This class does not require
+ * any codecs, and so is guaranteed to be instantiable. Therefore, it is the
+ * default ExternalMap provided by StorageManagerRam.
+ */
+public class RamMap<T> implements ExternalMap<T> {
+ private final ConcurrentSkipListMap<CharSequence, T> map
+ = new ConcurrentSkipListMap<CharSequence, T>();
+
+ @Inject
+ public RamMap(RamStorageService ramStore) {
+ //this.localStore = localStore;
+ }
+
+ @Override
+ public boolean containsKey(CharSequence key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public T get(CharSequence key) {
+ return map.get(key);
+ }
+
+ @Override
+ public T put(CharSequence key, T value) {
+ return map.put(key, value);
+ }
+
+ @Override
+ public T remove(CharSequence key) {
+ return map.remove(key);
+ }
+
+ @Override
+ public void putAll(Map<? extends CharSequence, ? extends T> m) {
+ map.putAll(m);
+ }
+
+ @Override
+ public Iterable<Entry<CharSequence, T>> getAll(Set<? extends CharSequence> keys) {
+ return new GetAllIterable<>(keys, this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
new file mode 100644
index 0000000..dc7e606
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamSpool.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A SpoolFile implementation that is backed by RAM.
+ * <p/>
+ * It uses an ArrayList to store the objects in.
+ */
+public final class RamSpool<T> implements Spool<T> {
+
+ private final List<T> backingStore = new ArrayList<T>();
+ private boolean canAppend = true;
+ private boolean canGetAccumulator = true;
+
+ @Inject
+ public RamSpool(RamStorageService ramStore) {
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ canAppend = false;
+ return backingStore.iterator();
+ }
+
+ @Override
+ public Accumulator<T> accumulator() {
+ if (!canGetAccumulator) {
+ throw new UnsupportedOperationException("Can only getAccumulator() once!");
+ }
+ canGetAccumulator = false;
+ return new Accumulator<T>() {
+ @Override
+ public void add(T datum) {
+ if (!canAppend) {
+ throw new ConcurrentModificationException("Attempt to append after creating iterator!");
+ }
+ backingStore.add(datum);
+ }
+
+ @Override
+ public void close() {
+ canAppend = false;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
new file mode 100644
index 0000000..64e489a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamStorageService.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.io.storage.ScratchSpace;
+import org.apache.reef.io.storage.StorageService;
+
+import javax.inject.Inject;
+
+public class RamStorageService implements StorageService {
+ @Inject
+ public RamStorageService() {
+ }
+
+ // TODO move getScratchSpace into its own class, just like everything else.
+ // TODO add context object or something to StorageService
+ @Override
+ public ScratchSpace getScratchSpace() {
+ throw new UnsupportedOperationException(
+ "No temp space / tracking of temp space for main memory (yet).");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
new file mode 100644
index 0000000..175c7a6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/SortingRamSpool.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.ram;
+
+import org.apache.reef.exception.evaluator.StorageException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+public class SortingRamSpool<T> implements Spool<T> {
+ private final PriorityQueue<T> heap;
+ private boolean ready = false;
+ private Accumulator<T> acc = new Accumulator<T>() {
+ @Override
+ public void add(T datum) throws StorageException {
+ if (ready)
+ throw new IllegalStateException("add called after close!");
+ heap.add(datum);
+ }
+
+ @Override
+ public void close() throws StorageException {
+ ready = true;
+ }
+ };
+ private Iterator<T> it = new Iterator<T>() {
+
+ @Override
+ public boolean hasNext() {
+ return !heap.isEmpty();
+ }
+
+ @Override
+ public T next() {
+ return heap.remove();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "This iterator consumes the data it returns. remove() does not make any sense!");
+ }
+
+ };
+
+ public SortingRamSpool() {
+ heap = new PriorityQueue<>();
+ }
+
+ public SortingRamSpool(Comparator<T> c) {
+ heap = new PriorityQueue<>(11, c);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if (!ready) {
+ throw new IllegalStateException("Cannot call iterator() while accumulator is still open!");
+ }
+ Iterator<T> ret = it;
+ it = null;
+ return ret;
+ }
+
+ @Override
+ public Accumulator<T> accumulator() throws StorageException {
+ Accumulator<T> ret = acc;
+ acc = null;
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
new file mode 100644
index 0000000..b0b892d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/GetAllIterable.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.ExternalMap;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class GetAllIterable<T> implements
+ Iterable<Map.Entry<CharSequence, T>> {
+ private final Set<? extends CharSequence> keys;
+ private final ExternalMap<T> map;
+
+ public GetAllIterable(Set<? extends CharSequence> keys, ExternalMap<T> map) {
+ this.keys = keys;
+ this.map = map;
+ }
+
+ @Override
+ public Iterator<Map.Entry<CharSequence, T>> iterator() {
+ final Iterator<? extends CharSequence> k = keys.iterator();
+ return new Iterator<Map.Entry<CharSequence, T>>() {
+ CharSequence lastKey = null;
+ CharSequence curKey = findNextKey();
+
+ private CharSequence findNextKey() {
+ while (k.hasNext()) {
+ CharSequence next = k.next();
+ if (map.containsKey(next)) {
+ return next;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curKey != null;
+ }
+
+ @Override
+ public Map.Entry<CharSequence, T> next() {
+ final CharSequence key = curKey;
+ curKey = findNextKey();
+ lastKey = key;
+ if (key == null)
+ throw new NoSuchElementException();
+
+ final T v = map.get(key);
+
+ return new Map.Entry<CharSequence, T>() {
+ @Override
+ public CharSequence getKey() {
+ return key;
+ }
+
+ @Override
+ public T getValue() {
+ return v;
+ }
+
+ @Override
+ public T setValue(T v) {
+ throw new UnsupportedOperationException(
+ "No support for mutating values via iterator");
+ }
+ };
+ }
+
+ @Override
+ public void remove() {
+ map.remove(lastKey);
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
new file mode 100644
index 0000000..12e1bd9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerCodec.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.serialization.Codec;
+
+public class IntegerCodec implements Codec<Integer> {
+
+ @Override
+ public byte[] encode(Integer obj) {
+ return Integer.toString(obj).getBytes();
+ }
+
+ @Override
+ public Integer decode(byte[] buf) {
+ return Integer.decode(new String(buf));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
new file mode 100644
index 0000000..5bd3641
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerDeserializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class IntegerDeserializer implements
+ Deserializer<Integer, InputStream> {
+ @Override
+ public Iterable<Integer> create(InputStream arg) {
+ final DataInputStream dis = new DataInputStream(arg);
+ return new Iterable<Integer>() {
+
+ @Override
+ public Iterator<Integer> iterator() {
+ return new Iterator<Integer>() {
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Integer next() {
+ try {
+ return dis.readInt();
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
new file mode 100644
index 0000000..69296cc
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/IntegerSerializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class IntegerSerializer implements
+ Serializer<Integer, OutputStream> {
+ @Override
+ public Accumulable<Integer> create(OutputStream arg) {
+ final DataOutputStream dos = new DataOutputStream(arg);
+ return new Accumulable<Integer>() {
+
+ @Override
+ public Accumulator<Integer> accumulator() throws ServiceException {
+ return new Accumulator<Integer>() {
+ @Override
+ public void add(Integer datum) throws ServiceException {
+ try {
+ dos.writeInt(datum);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ try {
+ dos.close();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
new file mode 100644
index 0000000..c1108f4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringDeserializer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceRuntimeException;
+import org.apache.reef.io.serialization.Deserializer;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class StringDeserializer implements
+ Deserializer<String, InputStream> {
+ @Override
+ public Iterable<String> create(InputStream arg) {
+ final DataInputStream dis = new DataInputStream(arg);
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String next() {
+ int len = 0;
+ try {
+ len = dis.readInt();
+ byte[] b = new byte[len];
+ dis.readFully(b);
+ return new String(b);
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
new file mode 100644
index 0000000..6cf544e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/StringSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.serialization.Serializer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class StringSerializer implements
+ Serializer<String, OutputStream> {
+ @Override
+ public Accumulable<String> create(OutputStream arg) {
+ final DataOutputStream dos = new DataOutputStream(arg);
+ return new Accumulable<String>() {
+
+ @Override
+ public Accumulator<String> accumulator() throws ServiceException {
+ return new Accumulator<String>() {
+
+ @Override
+ public void add(String datum) throws ServiceException {
+ byte[] b = datum.getBytes();
+ try {
+ dos.writeInt(b.length);
+ dos.write(b);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws ServiceException {
+ try {
+ dos.close();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
new file mode 100644
index 0000000..ae690f4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/util/TupleKeyComparator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage.util;
+
+import org.apache.reef.io.Tuple;
+
+import java.util.Comparator;
+
+public final class TupleKeyComparator<K, V> implements
+ Comparator<Tuple<K, V>> {
+ private final Comparator<K> c;
+
+ public TupleKeyComparator(Comparator<K> c) {
+ this.c = c;
+ }
+
+ @Override
+ public int compare(Tuple<K, V> o1, Tuple<K, V> o2) {
+ return c.compare(o1.getKey(), o2.getKey());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/proto/ns_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/proto/ns_protocol.proto b/lang/java/reef-io/src/main/proto/ns_protocol.proto
new file mode 100644
index 0000000..1a9db1e
--- /dev/null
+++ b/lang/java/reef-io/src/main/proto/ns_protocol.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.io.network.proto";
+option java_outer_classname = "ReefNetworkServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message NSMessagePBuf {
+ required string srcid = 2;
+ required string destid = 3;
+ repeated NSRecordPBuf msgs = 4;
+}
+
+message NSRecordPBuf {
+ required bytes data = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
new file mode 100644
index 0000000..f1f1a60
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class NameClientTest {
+
+ static int retryCount, retryTimeout;
+
+ static {
+ Tang tang = Tang.Factory.getTang();
+ try {
+ retryCount = tang.newInjector().getNamedInstance(NameLookupClient.RetryCount.class);
+ retryTimeout = tang.newInjector().getNamedInstance(NameLookupClient.RetryTimeout.class);
+ } catch (InjectionException e1) {
+ throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1);
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testClose() throws Exception {
+ IdentifierFactory factory = new StringIdentifierFactory();
+ try (NameServer server = new NameServerImpl(0, factory)) {
+ int serverPort = server.getPort();
+ try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+ new NameCache(10000))) {
+ Identifier id = factory.getNewInstance("Task1");
+ client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ client.unregister(id);
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}.
+ * To check caching behavior with expireAfterAccess & expireAfterWrite
+ * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testLookup() throws Exception {
+ IdentifierFactory factory = new StringIdentifierFactory();
+ try (NameServer server = new NameServerImpl(0, factory)) {
+ int serverPort = server.getPort();
+ try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+ new NameCache(150))) {
+ Identifier id = factory.getNewInstance("Task1");
+ client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ client.lookup(id);// caches the entry
+ client.unregister(id);
+ Thread.sleep(100);
+ try {
+ InetSocketAddress addr = client.lookup(id);
+ Thread.sleep(100);
+ //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+ //more and 100ms wait will not expire the item and will return the cached value
+ //With expireAfterWrite, the extra wait of 100 ms will expire the item
+ //resulting in NamingException and the test passes
+ addr = client.lookup(id);
+ Assert.assertNull("client.lookup(id)", addr);
+ } catch (Exception e) {
+ if (e instanceof ExecutionException) {
+ Assert.assertTrue("Execution Exception cause is instanceof NamingException", e.getCause() instanceof NamingException);
+ } else
+ throw e;
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
new file mode 100644
index 0000000..71382e2
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server and client test
+ */
+public class NamingTest {
+
+ private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
+ private static final int retryCount;
+ private static final int retryTimeout;
+
+ static {
+ try {
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+ retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+ } catch (final InjectionException ex) {
+ final String msg = "Exception while trying to find default values for retryCount & Timeout";
+ LOG.log(Level.SEVERE, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
+ @Rule
+ public final TestName name = new TestName();
+ final long TTL = 30000;
+ final IdentifierFactory factory = new StringIdentifierFactory();
+ int port;
+
+ /**
+ * NameServer and NameLookupClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNamingLookup() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ // names
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // run a server
+ final NameServer server = new NameServerImpl(0, this.factory);
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
+ }
+
+ // run a client
+ final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+ 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+
+ final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ client.close();
+ server.close();
+ }
+
+ /**
+ * Test concurrent lookups (threads share a client)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNamingLookup() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ // test it 3 times to make failure likely
+ for (int i = 0; i < 3; i++) {
+
+ LOG.log(Level.FINEST, "test {0}", i);
+
+ // names
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+ idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
+
+ // run a server
+ final NameServer server = new NameServerImpl(0, this.factory);
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
+ }
+
+ // run a client
+ final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+ 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+ final Identifier id3 = this.factory.getNewInstance("task3");
+
+ final ExecutorService e = Executors.newCachedThreadPool();
+
+ final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+ final Future<?> f1 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id1);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id1, addr);
+ }
+ });
+ final Future<?> f2 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id2);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id2, addr);
+ }
+ });
+ final Future<?> f3 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id3);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id3, addr);
+ }
+ });
+
+ f1.get();
+ f2.get();
+ f3.get();
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * NameServer and NameRegistryClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNamingRegistry() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ final NameServer server = new NameServerImpl(0, this.factory);
+ this.port = server.getPort();
+
+ // names to start with
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // registration
+ // invoke registration from the client side
+ final NameRegistryClient client = new NameRegistryClient(
+ NetUtils.getLocalAddress(), this.port, this.factory);
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
+
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
+
+ // check the server side
+ Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+ Iterable<NameAssignment> nas = server.lookup(ids);
+
+ for (final NameAssignment na : nas) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+ new Object[]{na.getIdentifier(), na.getAddress()});
+ serverMap.put(na.getIdentifier(), na.getAddress());
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
+
+ // wait
+ busyWait(server, 0, ids);
+
+ serverMap = new HashMap<Identifier, InetSocketAddress>();
+ nas = server.lookup(ids);
+ for (final NameAssignment na : nas)
+ serverMap.put(na.getIdentifier(), na.getAddress());
+
+ Assert.assertEquals(0, serverMap.size());
+
+ client.close();
+ server.close();
+ }
+
+ /**
+ * NameServer and NameClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNameClient() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ final NameServer server = new NameServerImpl(0, this.factory);
+ this.port = server.getPort();
+
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // registration
+ // invoke registration from the client side
+ final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
+ this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
+
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
+
+ // lookup
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+
+ final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
+
+ // wait
+ busyWait(server, 0, ids);
+
+ final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+ addr1 = server.lookup(id1);
+ if (addr1 != null) serverMap.put(id1, addr1);
+ addr2 = server.lookup(id1);
+ if (addr2 != null) serverMap.put(id2, addr2);
+
+ Assert.assertEquals(0, serverMap.size());
+
+ client.close();
+ server.close();
+ }
+
+ private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
+ final Map<Identifier, InetSocketAddress> map2) {
+
+ if (map1.size() != map2.size()) {
+ return false;
+ }
+
+ for (final Identifier id : map1.keySet()) {
+ final InetSocketAddress addr1 = map1.get(id);
+ final InetSocketAddress addr2 = map2.get(id);
+ if (!addr1.equals(addr2)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
+ int count = 0;
+ for (; ; ) {
+ final Iterable<NameAssignment> nas = server.lookup(ids);
+ for (final @SuppressWarnings("unused") NameAssignment na : nas) {
+ ++count;
+ }
+ if (count == expected) {
+ break;
+ }
+ count = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
new file mode 100644
index 0000000..96cf7d4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.impl.MessagingTransportFactory;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.NameServerImpl;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.services.network.util.Monitor;
+import org.apache.reef.services.network.util.StringCodec;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.Assert;
+import java.net.InetSocketAddress;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Network service test
+ */
+public class NetworkServiceTest {
+ private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());
+
+ @Rule
+ public TestName name = new TestName();
+
+ /**
+ * NetworkService messaging test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessagingNetworkService() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ IdentifierFactory factory = new StringIdentifierFactory();
+ String nameServerAddr = NetUtils.getLocalAddress();
+
+ NameServer server = new NameServerImpl(0, factory);
+ int nameServerPort = server.getPort();
+
+ final int numMessages = 10;
+ final Monitor monitor = new Monitor();
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ // network service
+ final String name2 = "task2";
+ NetworkService<String> ns2 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ final String name1 = "task1";
+ final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+ final Identifier destId = factory.getNewInstance(name2);
+ final Connection<String> conn = ns1.newConnection(destId);
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ conn.write("hello! " + count);
+ }
+ monitor.mwait();
+
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ conn.close();
+
+ ns1.close();
+ ns2.close();
+
+ server.close();
+ }
+
+ /**
+ * NetworkService messaging rate benchmark
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessagingNetworkServiceRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ IdentifierFactory factory = new StringIdentifierFactory();
+ String nameServerAddr = NetUtils.getLocalAddress();
+
+ NameServer server = new NameServerImpl(0, factory);
+ int nameServerPort = server.getPort();
+
+ final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ // network service
+ final String name2 = "task2";
+ NetworkService<String> ns2 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ final String name1 = "task1";
+ NetworkService<String> ns1 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+ Connection<String> conn = ns1.newConnection(destId);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
+
+ long start = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ conn.open();
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
+ conn.close();
+
+ ns1.close();
+ ns2.close();
+ }
+
+ server.close();
+ }
+
+ /**
+ * NetworkService messaging rate benchmark
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessagingNetworkServiceRateDisjoint() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ final IdentifierFactory factory = new StringIdentifierFactory();
+ final String nameServerAddr = NetUtils.getLocalAddress();
+
+ final NameServer server = new NameServerImpl(0, factory);
+ final int nameServerPort = server.getPort();
+
+ BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+ int numThreads = 4;
+ final int size = 2000;
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final int totalNumMessages = numMessages * numThreads;
+
+ ExecutorService e = Executors.newCachedThreadPool();
+ for (int t = 0; t < numThreads; t++) {
+ final int tt = t;
+
+ e.submit(new Runnable() {
+ public void run() {
+ try {
+ Monitor monitor = new Monitor();
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ // network service
+ final String name2 = "task2-" + tt;
+ NetworkService<String> ns2 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance(name2), new InetSocketAddress(nameServerAddr, port2));
+
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ final String name1 = "task1-" + tt;
+ NetworkService<String> ns1 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance(name1), new InetSocketAddress(nameServerAddr, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+ Connection<String> conn = ns1.newConnection(destId);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
+
+
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ conn.open();
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ conn.close();
+
+ ns1.close();
+ ns2.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ });
+ }
+
+ // start and time
+ long start = System.currentTimeMillis();
+ Object ignore = new Object();
+ for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+ e.shutdown();
+ e.awaitTermination(100, TimeUnit.SECONDS);
+ long end = System.currentTimeMillis();
+
+ double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+
+ server.close();
+ }
+
+ @Test
+ public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ IdentifierFactory factory = new StringIdentifierFactory();
+ String nameServerAddr = NetUtils.getLocalAddress();
+
+ NameServer server = new NameServerImpl(0, factory);
+ int nameServerPort = server.getPort();
+
+ final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ int numThreads = 2;
+ int totalNumMessages = numMessages * numThreads;
+ final Monitor monitor = new Monitor();
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ // network service
+ final String name2 = "task2";
+ NetworkService<String> ns2 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name2, monitor, totalNumMessages), new ExceptionHandler());
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ final String name1 = "task1";
+ NetworkService<String> ns1 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+ final Connection<String> conn = ns1.newConnection(destId);
+ conn.open();
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+
+ ExecutorService e = Executors.newCachedThreadPool();
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numThreads; i++) {
+ e.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ conn.write(message);
+ }
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+
+ e.shutdown();
+ e.awaitTermination(30, TimeUnit.SECONDS);
+ monitor.mwait();
+
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ conn.close();
+
+ ns1.close();
+ ns2.close();
+ }
+
+ server.close();
+ }
+
+ /**
+ * NetworkService messaging rate benchmark
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessagingNetworkServiceBatchingRate() throws Exception {
+ LOG.log(Level.FINEST, name.getMethodName());
+
+ IdentifierFactory factory = new StringIdentifierFactory();
+ String nameServerAddr = NetUtils.getLocalAddress();
+
+ NameServer server = new NameServerImpl(0, factory);
+ int nameServerPort = server.getPort();
+
+ final int batchSize = 1024 * 1024;
+ final int[] messageSizes = {32, 64, 512};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ // network service
+ final String name2 = "task2";
+ NetworkService<String> ns2 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
+
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ final String name1 = "task1";
+ NetworkService<String> ns1 = new NetworkService<String>(
+ factory, 0, nameServerAddr, nameServerPort,
+ new StringCodec(), new MessagingTransportFactory(),
+ new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+ Connection<String> conn = ns1.newConnection(destId);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
+
+ long start = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < batchSize / size; j++) {
+ sb.append(message);
+ }
+ conn.open();
+ conn.write(sb.toString());
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ }
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+ long numAppMessages = numMessages * batchSize / size;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
+ conn.close();
+
+ ns1.close();
+ ns2.close();
+ }
+
+ server.close();
+ }
+
+ /**
+ * Test message handler
+ *
+ * @param <T> type
+ */
+ class MessageHandler<T> implements EventHandler<Message<T>> {
+
+ private final String name;
+ private final int expected;
+ private final Monitor monitor;
+ private AtomicInteger count = new AtomicInteger(0);
+
+ MessageHandler(String name, Monitor monitor, int expected) {
+ this.name = name;
+ this.monitor = monitor;
+ this.expected = expected;
+ }
+
+ @Override
+ public void onNext(Message<T> value) {
+ count.incrementAndGet();
+
+ //System.out.print(name + " received " + value.getData() + " from " + value.getSrcId() + " to " + value.getDestId());
+ for (T obj : value.getData()) {
+ // System.out.print(" data: " + obj);
+ }
+ //LOG.log(Level.FINEST, );
+ if (count.get() == expected) {
+ monitor.mnotify();
+ }
+ }
+ }
+
+ /**
+ * Test exception handler
+ */
+ class ExceptionHandler implements EventHandler<Exception> {
+ @Override
+ public void onNext(Exception error) {
+ System.err.println(error);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
new file mode 100644
index 0000000..c325ab9
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import java.io.Serializable;
+
+/**
+ * Event for testing
+ */
+public class TestEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private String message;
+
+ public TestEvent(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
new file mode 100644
index 0000000..0d96670
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class LoggingUtils {
+ public static void setLoggingLevel(Level level) {
+ Handler[] handlers = Logger.getLogger("").getHandlers();
+ ConsoleHandler ch = null;
+ for (Handler h : handlers) {
+ if (h instanceof ConsoleHandler) {
+ ch = (ConsoleHandler) h;
+ break;
+ }
+ }
+ if (ch == null) {
+ ch = new ConsoleHandler();
+ Logger.getLogger("").addHandler(ch);
+ }
+ ch.setLevel(level);
+ Logger.getLogger("").setLevel(level);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
new file mode 100644
index 0000000..ccfb54c
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Monitor {
+ private AtomicBoolean finished = new AtomicBoolean(false);
+
+ public void mwait() throws InterruptedException {
+ synchronized (this) {
+ while (!finished.get())
+ this.wait();
+ }
+ }
+
+ public void mnotify() {
+ synchronized (this) {
+ finished.compareAndSet(false, true);
+ this.notifyAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
new file mode 100644
index 0000000..9c55976
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.wake.remote.Codec;
+
+
+public class StringCodec implements Codec<String> {
+ @Override
+ public byte[] encode(String obj) {
+ return obj.getBytes();
+ }
+
+ @Override
+ public String decode(byte[] buf) {
+ return new String(buf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
new file mode 100644
index 0000000..575ed33
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.PeriodicEvent;
+
+public class TimeoutHandler implements EventHandler<PeriodicEvent> {
+
+ private final Monitor monitor;
+
+ public TimeoutHandler(Monitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void onNext(PeriodicEvent event) {
+ monitor.mnotify();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
new file mode 100644
index 0000000..f40b8c4
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
new file mode 100644
index 0000000..7451ac8
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/ExternalMapTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.storage;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.storage.ram.CodecRamMap;
+import org.apache.reef.io.storage.ram.RamMap;
+import org.apache.reef.io.storage.ram.RamStorageService;
+import org.apache.reef.io.storage.util.IntegerCodec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+
+public class ExternalMapTest {
+ @Test
+ public void testCodecRamMap() {
+ RamStorageService ramStore = new RamStorageService();
+ Codec<Integer> c = new IntegerCodec();
+ ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c);
+ genericTest(m);
+ }
+
+ @Test
+ public void testRamMap() {
+ RamStorageService ramStore = new RamStorageService();
+ ExternalMap<Integer> m = new RamMap<>(ramStore);
+ genericTest(m);
+ }
+
+
+ void genericTest(ExternalMap<Integer> m) {
+ m.put("foo", 42);
+ Map<String, Integer> smallMap = new HashMap<>();
+ smallMap.put("bar", 43);
+ smallMap.put("baz", 44);
+
+ m.putAll(smallMap);
+
+ Assert.assertEquals(44, (int) m.get("baz"));
+ Assert.assertEquals(43, (int) m.get("bar"));
+ Assert.assertEquals(42, (int) m.get("foo"));
+ Assert.assertNull(m.get("quuz"));
+
+ Assert.assertTrue(m.containsKey("bar"));
+ Assert.assertFalse(m.containsKey("quuz"));
+
+ Set<String> barBaz = new HashSet<>();
+ barBaz.add("bar");
+ barBaz.add("baz");
+ barBaz.add("quuz");
+
+ Iterable<Map.Entry<CharSequence, Integer>> it = m.getAll(barBaz);
+
+ Map<CharSequence, Integer> found = new TreeMap<>();
+
+ for (Map.Entry<CharSequence, Integer> e : it) {
+ found.put(e.getKey(), e.getValue());
+ }
+ Iterator<CharSequence> it2 = found.keySet().iterator();
+ Assert.assertTrue(it2.hasNext());
+ CharSequence s = it2.next();
+ Assert.assertEquals(s, "bar");
+ Assert.assertEquals((int) found.get(s), 43);
+ Assert.assertTrue(it2.hasNext());
+ s = it2.next();
+ Assert.assertEquals(s, "baz");
+ Assert.assertEquals((int) found.get(s), 44);
+ Assert.assertFalse(it2.hasNext());
+
+ Assert.assertEquals(44, (int) m.remove("baz"));
+ Assert.assertFalse(m.containsKey("baz"));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
new file mode 100644
index 0000000..55857f9
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/storage/FramingTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.storage.FramingInputStream;
+import org.apache.reef.io.storage.FramingOutputStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class FramingTest {
+
+ @Test
+ public void frameRoundTripTest() throws IOException, ServiceException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+ FramingOutputStream o = new FramingOutputStream(baos);
+ FramingOutputStream o2 = new FramingOutputStream(baos2);
+ Accumulator<byte[]> a = o2.accumulator();
+ int offset = 0;
+ for (int i = 0; i < 256; i++) {
+ byte[] b = new byte[i];
+ Arrays.fill(b, (byte) i);
+ o.write(b);
+ if (i == 255) {
+ o.close();
+ } else {
+ o.nextFrame();
+ }
+ offset += (4 + i);
+ Assert.assertEquals(offset, o.getCurrentOffset());
+ a.add(b);
+ Assert.assertEquals(offset, o2.getCurrentOffset());
+ }
+ a.close();
+ o2.close();
+ byte[] b1 = baos.toByteArray();
+ byte[] b2 = baos2.toByteArray();
+ Assert.assertArrayEquals(b1, b2);
+ FramingInputStream inA1 = new FramingInputStream(new ByteArrayInputStream(b1));
+ FramingInputStream inA2 = new FramingInputStream(new ByteArrayInputStream(b2));
+ for (int i = 0; i <= 256; i++) {
+ byte[] b = new byte[i];
+ Arrays.fill(b, (byte) i);
+ byte[] f = inA1.readFrame();
+ byte[] g = inA2.readFrame();
+ if (i == 256) {
+ Assert.assertNull(f);
+ Assert.assertNull(g);
+ } else {
+ Assert.assertArrayEquals(b, f);
+ Assert.assertArrayEquals(b, g);
+ }
+ }
+ inA2.close();
+ inA1.close();
+
+ FramingInputStream inB1 = new FramingInputStream(new ByteArrayInputStream(b1));
+ int i = 0;
+ for (byte[] bin : inB1) {
+ byte[] b = new byte[i];
+ Arrays.fill(b, (byte) i);
+ Assert.assertArrayEquals(b, bin);
+ i++;
+ }
+ Assert.assertEquals(256, i);
+ inB1.close();
+
+ FramingInputStream inB2 = new FramingInputStream(new ByteArrayInputStream(b2));
+ i = 0;
+ for (byte[] bin : inB2) {
+ byte[] b = new byte[i];
+ Arrays.fill(b, (byte) i);
+ Assert.assertArrayEquals(b, bin);
+ i++;
+ }
+ Assert.assertEquals(256, i);
+ inB2.close();
+ Assert.assertArrayEquals(b1, b2);
+ }
+
+}