You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:03:39 UTC
[08/51] [abbrv] [partial] Initial merge of Wake,
Tang and REEF into one repository and project
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java b/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
deleted file mode 100644
index e83fd27..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/NamingTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import com.microsoft.reef.io.naming.NameAssignment;
-import com.microsoft.reef.io.network.naming.*;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.tang.Injector;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.NetUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Naming server and client test
- */
-public class NamingTest {
-
- private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
-
- @Rule
- public final TestName name = new TestName();
-
- final long TTL = 30000;
- int port;
- final IdentifierFactory factory = new StringIdentifierFactory();
-
- private static final int retryCount;
- private static final int retryTimeout;
-
- static {
- try {
- final Injector injector = Tang.Factory.getTang().newInjector();
- retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
- retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
- } catch (final InjectionException ex) {
- final String msg = "Exception while trying to find default values for retryCount & Timeout";
- LOG.log(Level.SEVERE, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
-
- /**
- * NameServer and NameLookupClient test
- *
- * @throws Exception
- */
- @Test
- public void testNamingLookup() throws Exception {
-
- LOG.log(Level.FINEST, this.name.getMethodName());
-
- // names
- final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
- // run a server
- final NameServer server = new NameServer(0, this.factory);
- this.port = server.getPort();
- for (final Identifier id : idToAddrMap.keySet()) {
- server.register(id, idToAddrMap.get(id));
- }
-
- // run a client
- final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
- 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
-
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
-
- final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
- InetSocketAddress addr1 = client.lookup(id1);
- respMap.put(id1, addr1);
- InetSocketAddress addr2 = client.lookup(id2);
- respMap.put(id2, addr2);
-
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
- }
-
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
- client.close();
- server.close();
- }
-
- /**
- * Test concurrent lookups (threads share a client)
- *
- * @throws Exception
- */
- @Test
- public void testConcurrentNamingLookup() throws Exception {
-
- LOG.log(Level.FINEST, this.name.getMethodName());
-
- // test it 3 times to make failure likely
- for (int i = 0; i < 3; i++) {
-
- LOG.log(Level.FINEST, "test {0}", i);
-
- // names
- final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
- idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
-
- // run a server
- final NameServer server = new NameServer(0, this.factory);
- this.port = server.getPort();
- for (final Identifier id : idToAddrMap.keySet()) {
- server.register(id, idToAddrMap.get(id));
- }
-
- // run a client
- final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
- 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
-
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
- final Identifier id3 = this.factory.getNewInstance("task3");
-
- final ExecutorService e = Executors.newCachedThreadPool();
-
- final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
-
- final Future<?> f1 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id1);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id1, addr);
- }
- });
- final Future<?> f2 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id2);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id2, addr);
- }
- });
- final Future<?> f3 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id3);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id3, addr);
- }
- });
-
- f1.get();
- f2.get();
- f3.get();
-
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
- }
-
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
- client.close();
- server.close();
- }
- }
-
- /**
- * NameServer and NameRegistryClient test
- *
- * @throws Exception
- */
- @Test
- public void testNamingRegistry() throws Exception {
-
- LOG.log(Level.FINEST, this.name.getMethodName());
-
- final NameServer server = new NameServer(0, this.factory);
- this.port = server.getPort();
-
- // names to start with
- final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
- // registration
- // invoke registration from the client side
- final NameRegistryClient client = new NameRegistryClient(
- NetUtils.getLocalAddress(), this.port, this.factory);
- for (final Identifier id : idToAddrMap.keySet()) {
- client.register(id, idToAddrMap.get(id));
- }
-
- // wait
- final Set<Identifier> ids = idToAddrMap.keySet();
- busyWait(server, ids.size(), ids);
-
- // check the server side
- Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
- Iterable<NameAssignment> nas = server.lookup(ids);
-
- for (final NameAssignment na : nas) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
- new Object[] { na.getIdentifier(), na.getAddress() });
- serverMap.put(na.getIdentifier(), na.getAddress());
- }
-
- Assert.assertTrue(isEqual(idToAddrMap, serverMap));
-
- // un-registration
- for (final Identifier id : idToAddrMap.keySet()) {
- client.unregister(id);
- }
-
- // wait
- busyWait(server, 0, ids);
-
- serverMap = new HashMap<Identifier, InetSocketAddress>();
- nas = server.lookup(ids);
- for (final NameAssignment na : nas)
- serverMap.put(na.getIdentifier(), na.getAddress());
-
- Assert.assertEquals(0, serverMap.size());
-
- client.close();
- server.close();
- }
-
- /**
- * NameServer and NameClient test
- *
- * @throws Exception
- */
- @Test
- public void testNameClient() throws Exception {
-
- LOG.log(Level.FINEST, this.name.getMethodName());
-
- final NameServer server = new NameServer(0, this.factory);
- this.port = server.getPort();
-
- final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
-
- // registration
- // invoke registration from the client side
- final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
- this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
- for (final Identifier id : idToAddrMap.keySet()) {
- client.register(id, idToAddrMap.get(id));
- }
-
- // wait
- final Set<Identifier> ids = idToAddrMap.keySet();
- busyWait(server, ids.size(), ids);
-
- // lookup
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
-
- final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
- InetSocketAddress addr1 = client.lookup(id1);
- respMap.put(id1, addr1);
- InetSocketAddress addr2 = client.lookup(id2);
- respMap.put(id2, addr2);
-
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[] { id, respMap.get(id) });
- }
-
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
- // un-registration
- for (final Identifier id : idToAddrMap.keySet()) {
- client.unregister(id);
- }
-
- // wait
- busyWait(server, 0, ids);
-
- final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
- addr1 = server.lookup(id1);
- if (addr1 != null) serverMap.put(id1, addr1);
- addr2 = server.lookup(id1);
- if (addr2 != null) serverMap.put(id2, addr2);
-
- Assert.assertEquals(0, serverMap.size());
-
- client.close();
- server.close();
- }
-
- private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
- final Map<Identifier, InetSocketAddress> map2) {
-
- if (map1.size() != map2.size()) {
- return false;
- }
-
- for (final Identifier id : map1.keySet()) {
- final InetSocketAddress addr1 = map1.get(id);
- final InetSocketAddress addr2 = map2.get(id);
- if (!addr1.equals(addr2)) {
- return false;
- }
- }
-
- return true;
- }
-
- private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
- int count = 0;
- for (;;) {
- final Iterable<NameAssignment> nas = server.lookup(ids);
- for (final @SuppressWarnings("unused") NameAssignment na : nas) {
- ++count;
- }
- if (count == expected) {
- break;
- }
- count = 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java b/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
deleted file mode 100644
index c1a156a..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/NetworkServiceTest.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import com.microsoft.reef.exception.evaluator.NetworkException;
-import com.microsoft.reef.io.network.Connection;
-import com.microsoft.reef.io.network.Message;
-import com.microsoft.reef.io.network.impl.MessagingTransportFactory;
-import com.microsoft.reef.io.network.impl.NetworkService;
-import com.microsoft.reef.io.network.naming.NameServer;
-import com.microsoft.reef.io.network.util.StringIdentifierFactory;
-import com.microsoft.reef.services.network.util.Monitor;
-import com.microsoft.reef.services.network.util.StringCodec;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.Identifier;
-import com.microsoft.wake.IdentifierFactory;
-import com.microsoft.wake.remote.NetUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Network service test
- */
-public class NetworkServiceTest {
- private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());
-
- @Rule
- public TestName name = new TestName();
-
- /**
- * NetworkService messaging test
- *
- * @throws Exception
- */
- @Test
- public void testMessagingNetworkService() throws Exception {
- LOG.log(Level.FINEST, name.getMethodName());
-
- IdentifierFactory factory = new StringIdentifierFactory();
- String nameServerAddr = NetUtils.getLocalAddress();
-
- NameServer server = new NameServer(0, factory);
- int nameServerPort = server.getPort();
-
- final int numMessages = 10;
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
- final Identifier destId = factory.getNewInstance(name2);
- final Connection<String> conn = ns1.newConnection(destId);
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- conn.write("hello! " + count);
- }
- monitor.mwait();
-
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- conn.close();
-
- ns1.close();
- ns2.close();
-
- server.close();
- }
-
- /**
- * NetworkService messaging rate benchmark
- *
- * @throws Exception
- */
- @Test
- public void testMessagingNetworkServiceRate() throws Exception {
- LOG.log(Level.FINEST, name.getMethodName());
-
- IdentifierFactory factory = new StringIdentifierFactory();
- String nameServerAddr = NetUtils.getLocalAddress();
-
- NameServer server = new NameServer(0, factory);
- int nameServerPort = server.getPort();
-
- final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
- for (int size : messageSizes) {
- final int numMessages = 300000 / (Math.max(1, size / 512));
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
-
- long start = System.currentTimeMillis();
- try {
- for (int i = 0; i < numMessages; i++) {
- conn.open();
- conn.write(message);
- }
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime);// x2 for unicode chars
- conn.close();
-
- ns1.close();
- ns2.close();
- }
-
- server.close();
- }
-
- /**
- * NetworkService messaging rate benchmark
- *
- * @throws Exception
- */
- @Test
- public void testMessagingNetworkServiceRateDisjoint() throws Exception {
- LOG.log(Level.FINEST, name.getMethodName());
-
- final IdentifierFactory factory = new StringIdentifierFactory();
- final String nameServerAddr = NetUtils.getLocalAddress();
-
- final NameServer server = new NameServer(0, factory);
- final int nameServerPort = server.getPort();
-
- BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
- int numThreads = 4;
- final int size = 2000;
- final int numMessages = 300000 / (Math.max(1, size / 512));
- final int totalNumMessages = numMessages * numThreads;
-
- ExecutorService e = Executors.newCachedThreadPool();
- for (int t = 0; t < numThreads; t++) {
- final int tt = t;
-
- e.submit(new Runnable() {
- public void run() {
- try {
- Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2-" + tt;
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance(name2), new InetSocketAddress(nameServerAddr, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1-" + tt;
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance(name1), new InetSocketAddress(nameServerAddr, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
-
-
- try {
- for (int i = 0; i < numMessages; i++) {
- conn.open();
- conn.write(message);
- }
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- conn.close();
-
- ns1.close();
- ns2.close();
- } catch (Exception e) {
- e.printStackTrace();
-
- }
- }
- });
- }
-
- // start and time
- long start = System.currentTimeMillis();
- Object ignore = new Object();
- for (int i = 0; i < numThreads; i++) barrier.add(ignore);
- e.shutdown();
- e.awaitTermination(100, TimeUnit.SECONDS);
- long end = System.currentTimeMillis();
-
- double runtime = ((double) end - start) / 1000;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
-
- server.close();
- }
-
- @Test
- public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
- LOG.log(Level.FINEST, name.getMethodName());
-
- IdentifierFactory factory = new StringIdentifierFactory();
- String nameServerAddr = NetUtils.getLocalAddress();
-
- NameServer server = new NameServer(0, factory);
- int nameServerPort = server.getPort();
-
- final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
-
- for (int size : messageSizes) {
- final int numMessages = 300000 / (Math.max(1, size / 512));
- int numThreads = 2;
- int totalNumMessages = numMessages * numThreads;
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name2, monitor, totalNumMessages), new ExceptionHandler());
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- final Connection<String> conn = ns1.newConnection(destId);
- conn.open();
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
-
- ExecutorService e = Executors.newCachedThreadPool();
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < numThreads; i++) {
- e.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- for (int i = 0; i < numMessages; i++) {
- conn.write(message);
- }
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
-
- e.shutdown();
- e.awaitTermination(30, TimeUnit.SECONDS);
- monitor.mwait();
-
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
-
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
- conn.close();
-
- ns1.close();
- ns2.close();
- }
-
- server.close();
- }
-
- /**
- * NetworkService messaging rate benchmark
- *
- * @throws Exception
- */
- @Test
- public void testMessagingNetworkServiceBatchingRate() throws Exception {
- LOG.log(Level.FINEST, name.getMethodName());
-
- IdentifierFactory factory = new StringIdentifierFactory();
- String nameServerAddr = NetUtils.getLocalAddress();
-
- NameServer server = new NameServer(0, factory);
- int nameServerPort = server.getPort();
-
- final int batchSize = 1024 * 1024;
- final int[] messageSizes = {32, 64, 512};
-
- for (int size : messageSizes) {
- final int numMessages = 300 / (Math.max(1, size / 512));
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler());
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new InetSocketAddress(nameServerAddr, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameServerAddr, nameServerPort,
- new StringCodec(), new MessagingTransportFactory(),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler());
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new InetSocketAddress(nameServerAddr, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
-
- long start = System.currentTimeMillis();
- try {
- for (int i = 0; i < numMessages; i++) {
- StringBuilder sb = new StringBuilder();
- for (int j = 0; j < batchSize / size; j++) {
- sb.append(message);
- }
- conn.open();
- conn.write(sb.toString());
- }
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
- long numAppMessages = numMessages * batchSize / size;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime);// x2 for unicode chars
- conn.close();
-
- ns1.close();
- ns2.close();
- }
-
- server.close();
- }
-
- /**
- * Test message handler
- *
- * @param <T> type
- */
- class MessageHandler<T> implements EventHandler<Message<T>> {
-
- private final String name;
- private final int expected;
- private final Monitor monitor;
- private AtomicInteger count = new AtomicInteger(0);
-
- MessageHandler(String name, Monitor monitor, int expected) {
- this.name = name;
- this.monitor = monitor;
- this.expected = expected;
- }
-
- @Override
- public void onNext(Message<T> value) {
- count.incrementAndGet();
-
- //System.out.print(name + " received " + value.getData() + " from " + value.getSrcId() + " to " + value.getDestId());
- for (T obj : value.getData()) {
- // System.out.print(" data: " + obj);
- }
- //LOG.log(Level.FINEST, );
- if (count.get() == expected) {
- monitor.mnotify();
- }
- }
- }
-
- /**
- * Test exception handler
- */
- class ExceptionHandler implements EventHandler<Exception> {
- @Override
- public void onNext(Exception error) {
- System.err.println(error);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java b/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
deleted file mode 100644
index 867a650..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/TestEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network;
-
-import java.io.Serializable;
-
-/**
- * Event for testing
- */
-public class TestEvent implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private String message;
-
- public TestEvent(String message) {
- this.message = message;
- }
-
- public String getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
deleted file mode 100644
index 43c661d..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/LoggingUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class LoggingUtils {
- public static void setLoggingLevel(Level level) {
- Handler[] handlers = Logger.getLogger("").getHandlers();
- ConsoleHandler ch = null;
- for (Handler h : handlers) {
- if (h instanceof ConsoleHandler) {
- ch = (ConsoleHandler)h;
- break;
- }
- }
- if (ch == null) {
- ch = new ConsoleHandler();
- Logger.getLogger("").addHandler(ch);
- }
- ch.setLevel(level);
- Logger.getLogger("").setLevel(level);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
deleted file mode 100644
index 1d7c901..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/Monitor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Monitor {
- private AtomicBoolean finished = new AtomicBoolean(false);
-
- public void mwait() throws InterruptedException {
- synchronized(this) {
- while(!finished.get())
- this.wait();
- }
- }
-
- public void mnotify() {
- synchronized(this) {
- finished.compareAndSet(false, true);
- this.notifyAll();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
deleted file mode 100644
index f22defb..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/StringCodec.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import com.microsoft.wake.remote.Codec;
-
-
-public class StringCodec implements Codec<String> {
- @Override
- public byte[] encode(String obj) {
- return obj.getBytes();
- }
-
- @Override
- public String decode(byte[] buf) {
- return new String(buf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
deleted file mode 100644
index 186797c..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/TimeoutHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
-
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.impl.PeriodicEvent;
-
-public class TimeoutHandler implements EventHandler<PeriodicEvent> {
-
- private final Monitor monitor;
-
- public TimeoutHandler(Monitor monitor) {
- this.monitor = monitor;
- }
-
- @Override
- public void onNext(PeriodicEvent event) {
- monitor.mnotify();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java b/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
deleted file mode 100644
index d806bb4..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/network/util/package-info.java
+++ /dev/null
@@ -1,16 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.network.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
deleted file mode 100644
index 04b41f6..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/ExternalMapTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.io.ExternalMap;
-import com.microsoft.reef.io.serialization.Codec;
-import com.microsoft.reef.io.storage.ram.CodecRamMap;
-import com.microsoft.reef.io.storage.ram.RamMap;
-import com.microsoft.reef.io.storage.ram.RamStorageService;
-import com.microsoft.reef.io.storage.util.IntegerCodec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-
-public class ExternalMapTest {
- @Test
- public void testCodecRamMap() {
- RamStorageService ramStore = new RamStorageService();
- Codec<Integer> c = new IntegerCodec();
- ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c);
- genericTest(m);
- }
- @Test
- public void testRamMap() {
- RamStorageService ramStore = new RamStorageService();
- ExternalMap<Integer> m = new RamMap<>(ramStore);
- genericTest(m);
- }
-
-
- void genericTest(ExternalMap<Integer> m) {
- m.put("foo", 42);
- Map<String, Integer> smallMap = new HashMap<>();
- smallMap.put("bar", 43);
- smallMap.put("baz", 44);
-
- m.putAll(smallMap);
-
- Assert.assertEquals(44, (int)m.get("baz"));
- Assert.assertEquals(43, (int)m.get("bar"));
- Assert.assertEquals(42, (int)m.get("foo"));
- Assert.assertNull(m.get("quuz"));
-
- Assert.assertTrue(m.containsKey("bar"));
- Assert.assertFalse(m.containsKey("quuz"));
-
- Set<String> barBaz = new HashSet<>();
- barBaz.add("bar");
- barBaz.add("baz");
- barBaz.add("quuz");
-
- Iterable<Map.Entry<CharSequence,Integer>> it = m.getAll(barBaz);
-
- Map<CharSequence, Integer> found = new TreeMap<>();
-
- for(Map.Entry<CharSequence, Integer> e: it) {
- found.put(e.getKey(), e.getValue());
- }
- Iterator<CharSequence> it2 = found.keySet().iterator();
- Assert.assertTrue(it2.hasNext());
- CharSequence s = it2.next();
- Assert.assertEquals(s, "bar");
- Assert.assertEquals((int)found.get(s), 43);
- Assert.assertTrue(it2.hasNext());
- s = it2.next();
- Assert.assertEquals(s, "baz");
- Assert.assertEquals((int)found.get(s), 44);
- Assert.assertFalse(it2.hasNext());
-
- Assert.assertEquals(44, (int)m.remove("baz"));
- Assert.assertFalse(m.containsKey("baz"));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
deleted file mode 100644
index 5713732..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/FramingTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.storage.FramingInputStream;
-import com.microsoft.reef.io.storage.FramingOutputStream;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
-public class FramingTest {
-
- @Test
- public void frameRoundTripTest() throws IOException, ServiceException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
- FramingOutputStream o = new FramingOutputStream(baos);
- FramingOutputStream o2 = new FramingOutputStream(baos2);
- Accumulator<byte[]> a = o2.accumulator();
- int offset = 0;
- for(int i = 0; i < 256; i++ ) {
- byte[] b = new byte[i];
- Arrays.fill(b, (byte)i);
- o.write(b);
- if(i == 255) {
- o.close();
- } else {
- o.nextFrame();
- }
- offset += (4 + i);
- Assert.assertEquals(offset, o.getCurrentOffset());
- a.add(b);
- Assert.assertEquals(offset, o2.getCurrentOffset());
- }
- a.close();
- o2.close();
- byte[] b1 = baos.toByteArray();
- byte[] b2 = baos2.toByteArray();
- Assert.assertArrayEquals(b1, b2);
- FramingInputStream inA1 = new FramingInputStream(new ByteArrayInputStream(b1));
- FramingInputStream inA2 = new FramingInputStream(new ByteArrayInputStream(b2));
- for(int i = 0; i <= 256; i++ ) {
- byte[] b = new byte[i];
- Arrays.fill(b, (byte)i);
- byte[] f = inA1.readFrame();
- byte[] g = inA2.readFrame();
- if(i == 256) {
- Assert.assertNull(f);
- Assert.assertNull(g);
- } else {
- Assert.assertArrayEquals(b, f);
- Assert.assertArrayEquals(b, g);
- }
- }
- inA2.close();
- inA1.close();
-
- FramingInputStream inB1 = new FramingInputStream(new ByteArrayInputStream(b1));
- int i = 0;
- for(byte[] bin : inB1) {
- byte[] b = new byte[i];
- Arrays.fill(b, (byte)i);
- Assert.assertArrayEquals(b, bin);
- i++;
- }
- Assert.assertEquals(256, i);
- inB1.close();
-
- FramingInputStream inB2 = new FramingInputStream(new ByteArrayInputStream(b2));
- i = 0;
- for(byte[] bin : inB2) {
- byte[] b = new byte[i];
- Arrays.fill(b, (byte)i);
- Assert.assertArrayEquals(b, bin);
- i++;
- }
- Assert.assertEquals(256, i);
- inB2.close();
- Assert.assertArrayEquals(b1, b2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
deleted file mode 100644
index 0c920c7..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/MergingIteratorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.io.storage.MergingIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-
-public class MergingIteratorTest {
-
- @Test
- public void testMergingIterator() {
- Comparator<Integer> cmp = new Comparator<Integer>() {
-
- @Override
- public int compare(Integer o1, Integer o2) {
- return Integer.compare(o1, o2);
- }
- };
- @SuppressWarnings("unchecked")
- Iterator<Integer>[] its = new Iterator[]{
- Arrays.asList(new Integer[]{1, 4, 7, 10}).iterator(),
- Arrays.asList(new Integer[]{2, 5, 8, 11}).iterator(),
- Arrays.asList(new Integer[]{3, 6, 9, 12}).iterator()
- };
- MergingIterator<Integer> merge = new MergingIterator<Integer>(cmp, its);
- int i = 1;
- while (merge.hasNext()) {
- Assert.assertEquals(i, (int) merge.next());
- i++;
- }
- Assert.assertEquals(13, i);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
deleted file mode 100644
index 759a0f3..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/SortingSpoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Spool;
-import com.microsoft.reef.io.storage.ram.SortingRamSpool;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class SortingSpoolTest {
-
- @Test
- public void testRamSpool() throws ServiceException {
- genericTest(new SortingRamSpool<Integer>(), new Comparator<Integer>() {
-
- @Override
- public int compare(Integer o1, Integer o2) {
- return Integer.compare(o1, o2);
- }
-
- });
- }
-
- @Test
- public void testRamSpoolComparator() throws ServiceException {
- Comparator<Integer> backwards = new Comparator<Integer>() {
-
- @Override
- public int compare(Integer o1, Integer o2) {
- return -1 * o1.compareTo(o2);
- }
-
- };
- genericTest(new SortingRamSpool<Integer>(backwards), backwards);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testRamSpoolAddAfterClose() throws ServiceException {
- Spool<Integer> s = new SortingRamSpool<>();
- genericAddAfterCloseTest(s);
- }
- @Test(expected = UnsupportedOperationException.class)
- public void testRamSpoolCantRemove() throws ServiceException {
- Spool<Integer> s = new SortingRamSpool<>();
- genericCantRemove(s);
- }
- @Test(expected = IllegalStateException.class)
- public void testIteratorBeforeClose() throws ServiceException {
- Spool<Integer> s = new SortingRamSpool<>();
- genericIteratorBeforeClose(s);
- }
-
- void genericTest(Spool<Integer> s, Comparator<Integer> comparator)
- throws ServiceException {
- List<Integer> l = new ArrayList<Integer>();
- Random r = new Random(42);
- while (l.size() < 100) {
- l.add(r.nextInt(75));
- }
- Accumulator<Integer> a = s.accumulator();
- for (int i = 0; i < 100; i++) {
- a.add(l.get(i));
- }
- a.close();
- List<Integer> m = new ArrayList<Integer>();
- for (int i : s) {
- m.add(i);
- }
- Integer[] sorted = l.toArray(new Integer[0]);
- Arrays.sort(sorted, 0, sorted.length, comparator);
- Integer[] shouldBeSorted = m.toArray(new Integer[0]);
- Assert.assertArrayEquals(sorted, shouldBeSorted);
- }
-
- void genericAddAfterCloseTest(Spool<?> s) throws ServiceException {
- Accumulator<?> a = s.accumulator();
- a.close();
- a.add(null);
- }
-
- void genericCantRemove(Spool<Integer> s) throws ServiceException {
- Accumulator<Integer> a = s.accumulator();
- a.add(10);
- a.close();
- Iterator<?> it = s.iterator();
- it.remove();
- }
-
- void genericIteratorBeforeClose(Spool<Integer> s) throws ServiceException {
- Accumulator<Integer> a = s.accumulator();
- a.add(10);
- s.iterator();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
deleted file mode 100644
index 057bcf0..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/SpoolFileTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulable;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Spool;
-import com.microsoft.reef.io.serialization.Codec;
-import com.microsoft.reef.io.serialization.Deserializer;
-import com.microsoft.reef.io.serialization.Serializer;
-import com.microsoft.reef.io.storage.local.CodecFileAccumulable;
-import com.microsoft.reef.io.storage.local.CodecFileIterable;
-import com.microsoft.reef.io.storage.local.LocalStorageService;
-import com.microsoft.reef.io.storage.local.SerializerFileSpool;
-import com.microsoft.reef.io.storage.ram.RamSpool;
-import com.microsoft.reef.io.storage.ram.RamStorageService;
-import com.microsoft.reef.io.storage.util.IntegerCodec;
-import com.microsoft.tang.ConfigurationBuilder;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.exceptions.BindException;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.tang.formats.AvroConfigurationSerializer;
-import com.microsoft.tang.formats.ConfigurationModule;
-import com.microsoft.tang.formats.ConfigurationModuleBuilder;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Iterator;
-
-public class SpoolFileTest {
- public static final class RamConf extends ConfigurationModuleBuilder {
- public static final ConfigurationModule CONF = new RamConf()
- .bindImplementation(RamStorageService.class, RamStorageService.class)
- .bindImplementation(Spool.class, RamSpool.class)
- .build();
- }
-
- @Test
- public void testRam() throws BindException, InjectionException, ServiceException, IOException {
- final Tang t = Tang.Factory.getTang();
- final ConfigurationBuilder configurationBuilderOne = t.newConfigurationBuilder(RamConf.CONF.build());
-
- final AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
- final String serializedConfiguration = serializer.toString(configurationBuilderOne.build());
- final ConfigurationBuilder configurationBuilderTwo = t.newConfigurationBuilder(serializer.fromString(serializedConfiguration));
-
- @SuppressWarnings("unchecked")
- final Spool<Integer> f = (Spool<Integer>) t.newInjector(configurationBuilderTwo.build()).getInstance(
- Spool.class);
- test(f);
- }
-
- private final Serializer<Integer, OutputStream> serializer = new Serializer<Integer, OutputStream>() {
- @Override
- public Accumulable<Integer> create(final OutputStream out) {
- return new Accumulable<Integer>() {
-
- @Override
- public Accumulator<Integer> accumulator() {
- return new Accumulator<Integer>() {
-
- @Override
- public void add(Integer datum) {
- try {
- int d = datum;
- out.write(new byte[]{(byte) (d >>> 24), (byte) (d >>> 16),
- (byte) (d >>> 8), (byte) d});
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- out.flush();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
- };
- }
- };
-
- private final Deserializer<Integer, InputStream> deserializer = new Deserializer<Integer, InputStream>() {
- @Override
- public Iterable<Integer> create(final InputStream in) {
- return new Iterable<Integer>() {
- @Override
- public Iterator<Integer> iterator() {
- Iterator<Integer> it = new Iterator<Integer>() {
- final byte[] inb = new byte[4];
- Integer nextInt;
-
- @Override
- public boolean hasNext() {
- return nextInt != null;
- }
-
- private void prime() {
- int read;
- try {
- read = in.read(inb);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- if (read != 4) {
- nextInt = null;
- } else {
- nextInt = ((inb[0] & 0xFF) << 24) + ((inb[1] & 0xFF) << 16)
- + ((inb[2] & 0xFF) << 8) + (inb[3] & 0xFF);
- }
-
- }
-
- @Override
- public Integer next() {
- Integer ret = nextInt;
- prime();
- return ret;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- it.next(); // calls prime
- return it;
- }
- };
- }
- };
-
- @Test
- public void testFile() throws ServiceException {
- LocalStorageService service = new LocalStorageService("spoolTest", "file");
- Spool<Integer> f = new SerializerFileSpool<Integer>(service, serializer,
- deserializer);
- test(f);
- service.getScratchSpace().delete();
- }
-
- @Test
- public void testInterop() throws ServiceException {
- LocalStorageService service = new LocalStorageService("spoolTest", "file");
- Codec<Integer> c = new IntegerCodec();
-
-
- CodecFileAccumulable<Integer, Codec<Integer>> f = new CodecFileAccumulable<Integer, Codec<Integer>>(
- service, c);
- CodecFileIterable<Integer, Codec<Integer>> g = new CodecFileIterable<Integer, Codec<Integer>>(
- new File(f.getName()), c);
- test(f, g);
- service.getScratchSpace().delete();
- }
-
- protected void test(Spool<Integer> f) throws ServiceException {
- test(f, f);
- }
-
- protected void test(Accumulable<Integer> f, Iterable<Integer> g) throws ServiceException {
-
- try (Accumulator<Integer> acc = f.accumulator()) {
- for (int i = 0; i < 1000; i++) {
- acc.add(i);
- }
- }
- int i = 0;
- for (int j : g) {
- Assert.assertEquals(i, j);
- i++;
- }
- Iterator<Integer> itA = g.iterator();
- Iterator<Integer> itB = g.iterator();
-
- for (i = 0; i < 1000; i++) {
- Assert.assertEquals((int) itA.next(), i);
- Assert.assertEquals((int) itB.next(), i);
- }
- Assert.assertFalse(itA.hasNext());
- Assert.assertFalse(itB.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java b/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
deleted file mode 100644
index b112271..0000000
--- a/reef-io/src/test/java/com/microsoft/reef/services/storage/TupleSerializerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.services.storage;
-
-import com.microsoft.reef.exception.evaluator.ServiceException;
-import com.microsoft.reef.io.Accumulator;
-import com.microsoft.reef.io.Tuple;
-import com.microsoft.reef.io.serialization.Deserializer;
-import com.microsoft.reef.io.serialization.Serializer;
-import com.microsoft.reef.io.storage.FramingTupleDeserializer;
-import com.microsoft.reef.io.storage.FramingTupleSerializer;
-import com.microsoft.reef.io.storage.util.IntegerDeserializer;
-import com.microsoft.reef.io.storage.util.IntegerSerializer;
-import com.microsoft.reef.io.storage.util.StringDeserializer;
-import com.microsoft.reef.io.storage.util.StringSerializer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-public class TupleSerializerTest {
-
- private Serializer<Integer, OutputStream> keySerializer;
- private Serializer<String, OutputStream> valSerializer;
- private Deserializer<Integer, InputStream> keyDeserializer;
- private Deserializer<String, InputStream> valDeserializer;
- private FramingTupleSerializer<Integer, String> fts;
- private ByteArrayOutputStream baos;
- private FramingTupleDeserializer<Integer, String> ftd;
- private Iterable<Tuple<Integer, String>> iterable;
-
- @Before
- public void setup() throws ServiceException {
-
- keySerializer = new IntegerSerializer();
- valSerializer = new StringSerializer();
- keyDeserializer = new IntegerDeserializer();
- valDeserializer = new StringDeserializer();
-
- fts = new FramingTupleSerializer<Integer, String>(
- keySerializer, valSerializer);
-
- baos = new ByteArrayOutputStream();
- Accumulator<Tuple<Integer,String>> acc = fts.create(baos).accumulator();
- for(int i = 0; i < 100; i++) {
- acc.add(new Tuple<>(i, i+""));
- }
- acc.close();
-
- ftd = new FramingTupleDeserializer<Integer, String>(
- keyDeserializer, valDeserializer);
- iterable = ftd.create(new ByteArrayInputStream(baos.toByteArray()));
- }
-
- @Test
- public void testFramingSerializer() throws ServiceException, IOException {
- int i = 0;
- for (Tuple<Integer, String> t : iterable) {
- Tuple<Integer, String> u = new Tuple<>(i, i + "");
- Assert.assertEquals(u, t);
- i++;
- }
- Assert.assertEquals(100, i);
- }
-
- @Test(expected = NoSuchElementException.class)
- public void testReadOffEnd() {
- Iterator<Tuple<Integer, String>> it = iterable.iterator();
- try {
- while (it.hasNext()) {
- it.next();
- it.hasNext();
- }
- } catch (NoSuchElementException e) {
- throw new IllegalStateException("Errored out too early!", e);
- }
- it.next();
- }
- @Test(expected = UnsupportedOperationException.class)
- public void testCantRemove() {
- Iterator<Tuple<Integer, String>> it = iterable.iterator();
- it.next();
- it.remove();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
new file mode 100644
index 0000000..8c570bb
--- /dev/null
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.NameCache;
+import org.apache.reef.io.network.naming.NameClient;
+import org.apache.reef.io.network.naming.NameLookupClient;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class NameClientTest {
+
+ static int retryCount, retryTimeout;
+
+ static {
+ Tang tang = Tang.Factory.getTang();
+ try {
+ retryCount = tang.newInjector().getNamedInstance(NameLookupClient.RetryCount.class);
+ retryTimeout = tang.newInjector().getNamedInstance(NameLookupClient.RetryTimeout.class);
+ } catch (InjectionException e1) {
+ throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1);
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testClose() throws Exception {
+ IdentifierFactory factory = new StringIdentifierFactory();
+ try (NameServer server = new NameServer(0, factory)) {
+ int serverPort = server.getPort();
+ try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+ new NameCache(10000))) {
+ Identifier id = factory.getNewInstance("Task1");
+ client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ client.unregister(id);
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}.
+ * To check caching behavior with expireAfterAccess & expireAfterWrite
+ * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testLookup() throws Exception {
+ IdentifierFactory factory = new StringIdentifierFactory();
+ try (NameServer server = new NameServer(0, factory)) {
+ int serverPort = server.getPort();
+ try (NameClient client = new NameClient(NetUtils.getLocalAddress(), serverPort, factory, retryCount, retryTimeout,
+ new NameCache(150))) {
+ Identifier id = factory.getNewInstance("Task1");
+ client.register(id, new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ client.lookup(id);// caches the entry
+ client.unregister(id);
+ Thread.sleep(100);
+ try {
+ InetSocketAddress addr = client.lookup(id);
+ Thread.sleep(100);
+ //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+ //more and 100ms wait will not expire the item and will return the cached value
+ //With expireAfterWrite, the extra wait of 100 ms will expire the item
+ //resulting in NamingException and the test passes
+ addr = client.lookup(id);
+ Assert.assertNull("client.lookup(id)", addr);
+ } catch (Exception e) {
+ if (e instanceof ExecutionException) {
+ Assert.assertTrue("Execution Exception cause is instanceof NamingException", e.getCause() instanceof NamingException);
+ } else
+ throw e;
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git a/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
new file mode 100644
index 0000000..a706196
--- /dev/null
+++ b/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -0,0 +1,364 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.*;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.NetUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server and client test
+ */
+public class NamingTest {
+
+ private static final Logger LOG = Logger.getLogger(NamingTest.class.getName());
+ private static final int retryCount;
+ private static final int retryTimeout;
+
+ static {
+ try {
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ retryCount = injector.getNamedInstance(NameLookupClient.RetryCount.class);
+ retryTimeout = injector.getNamedInstance(NameLookupClient.RetryTimeout.class);
+ } catch (final InjectionException ex) {
+ final String msg = "Exception while trying to find default values for retryCount & Timeout";
+ LOG.log(Level.SEVERE, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
+ @Rule
+ public final TestName name = new TestName();
+ final long TTL = 30000;
+ final IdentifierFactory factory = new StringIdentifierFactory();
+ int port;
+
+ /**
+ * NameServer and NameLookupClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNamingLookup() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ // names
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // run a server
+ final NameServer server = new NameServer(0, this.factory);
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
+ }
+
+ // run a client
+ final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+ 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+
+ final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ client.close();
+ server.close();
+ }
+
+ /**
+ * Test concurrent lookups (threads share a client)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNamingLookup() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ // test it 3 times to make failure likely
+ for (int i = 0; i < 3; i++) {
+
+ LOG.log(Level.FINEST, "test {0}", i);
+
+ // names
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+ idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(NetUtils.getLocalAddress(), 7003));
+
+ // run a server
+ final NameServer server = new NameServer(0, this.factory);
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
+ }
+
+ // run a client
+ final NameLookupClient client = new NameLookupClient(NetUtils.getLocalAddress(), this.port,
+ 10000, this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+ final Identifier id3 = this.factory.getNewInstance("task3");
+
+ final ExecutorService e = Executors.newCachedThreadPool();
+
+ final ConcurrentMap<Identifier, InetSocketAddress> respMap = new ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+ final Future<?> f1 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id1);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id1, addr);
+ }
+ });
+ final Future<?> f2 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id2);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id2, addr);
+ }
+ });
+ final Future<?> f3 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id3);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id3, addr);
+ }
+ });
+
+ f1.get();
+ f2.get();
+ f3.get();
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ client.close();
+ server.close();
+ }
+ }
+
+ /**
+ * NameServer and NameRegistryClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNamingRegistry() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ final NameServer server = new NameServer(0, this.factory);
+ this.port = server.getPort();
+
+ // names to start with
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // registration
+ // invoke registration from the client side
+ final NameRegistryClient client = new NameRegistryClient(
+ NetUtils.getLocalAddress(), this.port, this.factory);
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
+
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
+
+ // check the server side
+ Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+ Iterable<NameAssignment> nas = server.lookup(ids);
+
+ for (final NameAssignment na : nas) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+ new Object[]{na.getIdentifier(), na.getAddress()});
+ serverMap.put(na.getIdentifier(), na.getAddress());
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
+
+ // wait
+ busyWait(server, 0, ids);
+
+ serverMap = new HashMap<Identifier, InetSocketAddress>();
+ nas = server.lookup(ids);
+ for (final NameAssignment na : nas)
+ serverMap.put(na.getIdentifier(), na.getAddress());
+
+ Assert.assertEquals(0, serverMap.size());
+
+ client.close();
+ server.close();
+ }
+
+ /**
+ * NameServer and NameClient test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNameClient() throws Exception {
+
+ LOG.log(Level.FINEST, this.name.getMethodName());
+
+ final NameServer server = new NameServer(0, this.factory);
+ this.port = server.getPort();
+
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(NetUtils.getLocalAddress(), 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(NetUtils.getLocalAddress(), 7002));
+
+ // registration
+ // invoke registration from the client side
+ final NameClient client = new NameClient(NetUtils.getLocalAddress(), this.port,
+ this.factory, retryCount, retryTimeout, new NameCache(this.TTL));
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
+
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
+
+ // lookup
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+
+ final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)});
+ }
+
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
+
+ // wait
+ busyWait(server, 0, ids);
+
+ final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>();
+ addr1 = server.lookup(id1);
+ if (addr1 != null) serverMap.put(id1, addr1);
+ addr2 = server.lookup(id1);
+ if (addr2 != null) serverMap.put(id2, addr2);
+
+ Assert.assertEquals(0, serverMap.size());
+
+ client.close();
+ server.close();
+ }
+
+ private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
+ final Map<Identifier, InetSocketAddress> map2) {
+
+ if (map1.size() != map2.size()) {
+ return false;
+ }
+
+ for (final Identifier id : map1.keySet()) {
+ final InetSocketAddress addr1 = map1.get(id);
+ final InetSocketAddress addr2 = map2.get(id);
+ if (!addr1.equals(addr2)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) {
+ int count = 0;
+ for (; ; ) {
+ final Iterable<NameAssignment> nas = server.lookup(ids);
+ for (final @SuppressWarnings("unused") NameAssignment na : nas) {
+ ++count;
+ }
+ if (count == expected) {
+ break;
+ }
+ count = 0;
+ }
+ }
+}