You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/07/31 05:15:27 UTC

incubator-reef git commit: [REEF-474]: Implement LocalNameResolver looking up local NameServer.

Repository: incubator-reef
Updated Branches:
  refs/heads/master 1da62700f -> 84feeaf6a


[REEF-474]: Implement LocalNameResolver looking up local NameServer.

This pull request addressed the issue by

* Implementing `LocalNameResolverImpl` looking up local `NameServer`.

JIRA: [REEF-474](https://issues.apache.org/jira/browse/REEF-474)

Pull Request:
  This closes #294


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/84feeaf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/84feeaf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/84feeaf6

Branch: refs/heads/master
Commit: 84feeaf6a3179b0b3307ca6ac8b126446c87f016
Parents: 1da6270
Author: taegeonum <ta...@gmail.com>
Authored: Mon Jul 13 12:00:32 2015 +0900
Committer: Brian Cho <ch...@apache.org>
Committed: Fri Jul 31 11:57:40 2015 +0900

----------------------------------------------------------------------
 .../naming/LocalNameResolverConfiguration.java  |  55 +++++++++
 .../network/naming/LocalNameResolverImpl.java   | 123 +++++++++++++++++++
 .../services/network/LocalNameResolverTest.java | 104 ++++++++++++++++
 3 files changed, 282 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
new file mode 100644
index 0000000..7713e46
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+
+/**
+ * Configuration Module Builder for LocalNameResolver.
+ */
+public final class LocalNameResolverConfiguration extends ConfigurationModuleBuilder {
+
+  /**
+   * The timeout of caching lookup.
+   */
+  public static final OptionalParameter<Long> CACHE_TIMEOUT = new OptionalParameter<>();
+
+  /**
+   * The timeout of retrying connection.
+   */
+  public static final OptionalParameter<Integer> RETRY_TIMEOUT = new OptionalParameter<>();
+
+  /**
+   * The number of retrying connection.
+   */
+  public static final OptionalParameter<Integer> RETRY_COUNT = new OptionalParameter<>();
+
+  public static final ConfigurationModule CONF = new LocalNameResolverConfiguration()
+      .bindNamedParameter(NameResolverCacheTimeout.class, CACHE_TIMEOUT)
+      .bindNamedParameter(NameResolverRetryTimeout.class, RETRY_TIMEOUT)
+      .bindNamedParameter(NameResolverRetryCount.class, RETRY_COUNT)
+      .bindImplementation(NameResolver.class, LocalNameResolverImpl.class)
+      .build();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
new file mode 100644
index 0000000..1aa3de7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.cache.Cache;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * NameResolver looking up local name server.
+ * This class should be used when the NameServer is started locally.
+ */
+public final class LocalNameResolverImpl implements NameResolver {
+
+  private static final Logger LOG = Logger.getLogger(LocalNameResolverImpl.class.getName());
+
+  /**
+   * A local name server.
+   */
+  private final NameServer nameServer;
+
+  /**
+   * A cache for lookup.
+   */
+  private final Cache<Identifier, InetSocketAddress> cache;
+
+  /**
+   * Retry count for lookup.
+   */
+  private final int retryCount;
+
+  /**
+   * Retry timeout for lookup.
+   */
+  private final int retryTimeout;
+
+  @Inject
+  private LocalNameResolverImpl(
+      final NameServer nameServer,
+      @Parameter(NameResolverCacheTimeout.class) final long timeout,
+      @Parameter(NameResolverRetryCount.class) final int retryCount,
+      @Parameter(NameResolverRetryTimeout.class) final int retryTimeout) {
+    this.nameServer = nameServer;
+    this.cache = new NameCache(timeout);
+    this.retryCount = retryCount;
+    this.retryTimeout = retryTimeout;
+  }
+
+  @Override
+  public synchronized void register(final Identifier id, final InetSocketAddress address) throws NetworkException {
+    nameServer.register(id, address);
+  }
+
+  @Override
+  public synchronized void unregister(final Identifier id) throws NetworkException {
+    nameServer.unregister(id);
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
+
+  @Override
+  public InetSocketAddress lookup(final Identifier id) throws Exception {
+    return cache.get(id, new Callable<InetSocketAddress>() {
+      @Override
+      public InetSocketAddress call() throws Exception {
+        final int origRetryCount = LocalNameResolverImpl.this.retryCount;
+        int retriesLeft = origRetryCount;
+        while (true) {
+          try {
+            final InetSocketAddress addr = nameServer.lookup(id);
+            if (addr == null) {
+              throw new NullPointerException();
+            } else {
+              return addr;
+            }
+          } catch (final NullPointerException e) {
+            if (retriesLeft <= 0) {
+              throw new NamingException("Cannot find " + id + " from the name server");
+            } else {
+              final int retTimeout = LocalNameResolverImpl.this.retryTimeout
+                  * (origRetryCount - retriesLeft + 1);
+              LOG.log(Level.WARNING,
+                  "Caught Naming Exception while looking up " + id
+                      + " with Name Server. Will retry " + retriesLeft
+                      + " time(s) after waiting for " + retTimeout + " msec.");
+              Thread.sleep(retTimeout * retriesLeft);
+              --retriesLeft;
+            }
+          }
+        }
+      }
+    });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
new file mode 100644
index 0000000..47c8700
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class LocalNameResolverTest {
+
+  private final LocalAddressProvider localAddressProvider;
+
+  public LocalNameResolverTest() throws InjectionException {
+    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testClose() throws Exception {
+    final String localAddress = localAddressProvider.getLocalAddress();
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000)
+        .build()).getInstance(NameResolver.class)) {
+      final Identifier id = factory.getNewInstance("Task1");
+      resolver.register(id, new InetSocketAddress(localAddress, 7001));
+      resolver.unregister(id);
+      Thread.sleep(100);
+    }
+  }
+
+  /**
+   * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}.
+   * To check caching behavior with expireAfterAccess & expireAfterWrite
+   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testLookup() throws Exception {
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    final String localAddress = localAddressProvider.getLocalAddress();
+    try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150)
+        .build()).getInstance(NameResolver.class)) {
+      final Identifier id = factory.getNewInstance("Task1");
+      final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 7001);
+      resolver.register(id, socketAddr);
+      InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry
+      Assert.assertTrue(socketAddr.equals(lookupAddr));
+      resolver.unregister(id);
+      Thread.sleep(100);
+      try {
+        lookupAddr = resolver.lookup(id);
+        Thread.sleep(100);
+        //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+        //more and 100ms wait will not expire the item and will return the cached value
+        //With expireAfterWrite, the extra wait of 100 ms will expire the item
+        //resulting in NamingException and the test passes
+        lookupAddr = resolver.lookup(id);
+        Assert.assertNull("resolver.lookup(id)", lookupAddr);
+      } catch (final Exception e) {
+        if (e instanceof ExecutionException) {
+          Assert.assertTrue("Execution Exception cause is instanceof NamingException",
+              e.getCause() instanceof NamingException);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+}