You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ch...@apache.org on 2015/07/31 05:15:27 UTC
incubator-reef git commit: [REEF-474]: Implement LocalNameResolver
looking up local NameServer.
Repository: incubator-reef
Updated Branches:
refs/heads/master 1da62700f -> 84feeaf6a
[REEF-474]: Implement LocalNameResolver looking up local NameServer.
This pull request addressed the issue by
* Implementing `LocalNameResolverImpl` looking up local `NameServer`.
JIRA: [REEF-474](https://issues.apache.org/jira/browse/REEF-474)
Pull Request:
This closes #294
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/84feeaf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/84feeaf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/84feeaf6
Branch: refs/heads/master
Commit: 84feeaf6a3179b0b3307ca6ac8b126446c87f016
Parents: 1da6270
Author: taegeonum <ta...@gmail.com>
Authored: Mon Jul 13 12:00:32 2015 +0900
Committer: Brian Cho <ch...@apache.org>
Committed: Fri Jul 31 11:57:40 2015 +0900
----------------------------------------------------------------------
.../naming/LocalNameResolverConfiguration.java | 55 +++++++++
.../network/naming/LocalNameResolverImpl.java | 123 +++++++++++++++++++
.../services/network/LocalNameResolverTest.java | 104 ++++++++++++++++
3 files changed, 282 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
new file mode 100644
index 0000000..7713e46
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverConfiguration.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+
+/**
+ * Configuration Module Builder for LocalNameResolver.
+ */
+public final class LocalNameResolverConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The timeout of caching lookup.
+ */
+ public static final OptionalParameter<Long> CACHE_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The timeout of retrying connection.
+ */
+ public static final OptionalParameter<Integer> RETRY_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The number of retrying connection.
+ */
+ public static final OptionalParameter<Integer> RETRY_COUNT = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF = new LocalNameResolverConfiguration()
+ .bindNamedParameter(NameResolverCacheTimeout.class, CACHE_TIMEOUT)
+ .bindNamedParameter(NameResolverRetryTimeout.class, RETRY_TIMEOUT)
+ .bindNamedParameter(NameResolverRetryCount.class, RETRY_COUNT)
+ .bindImplementation(NameResolver.class, LocalNameResolverImpl.class)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
new file mode 100644
index 0000000..1aa3de7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/LocalNameResolverImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.parameters.NameResolverCacheTimeout;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.cache.Cache;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * NameResolver looking up local name server.
+ * This class should be used when the NameServer is started locally.
+ */
+public final class LocalNameResolverImpl implements NameResolver {
+
+ private static final Logger LOG = Logger.getLogger(LocalNameResolverImpl.class.getName());
+
+ /**
+ * A local name server.
+ */
+ private final NameServer nameServer;
+
+ /**
+ * A cache for lookup.
+ */
+ private final Cache<Identifier, InetSocketAddress> cache;
+
+ /**
+ * Retry count for lookup.
+ */
+ private final int retryCount;
+
+ /**
+ * Retry timeout for lookup.
+ */
+ private final int retryTimeout;
+
+ @Inject
+ private LocalNameResolverImpl(
+ final NameServer nameServer,
+ @Parameter(NameResolverCacheTimeout.class) final long timeout,
+ @Parameter(NameResolverRetryCount.class) final int retryCount,
+ @Parameter(NameResolverRetryTimeout.class) final int retryTimeout) {
+ this.nameServer = nameServer;
+ this.cache = new NameCache(timeout);
+ this.retryCount = retryCount;
+ this.retryTimeout = retryTimeout;
+ }
+
+ @Override
+ public synchronized void register(final Identifier id, final InetSocketAddress address) throws NetworkException {
+ nameServer.register(id, address);
+ }
+
+ @Override
+ public synchronized void unregister(final Identifier id) throws NetworkException {
+ nameServer.unregister(id);
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public InetSocketAddress lookup(final Identifier id) throws Exception {
+ return cache.get(id, new Callable<InetSocketAddress>() {
+ @Override
+ public InetSocketAddress call() throws Exception {
+ final int origRetryCount = LocalNameResolverImpl.this.retryCount;
+ int retriesLeft = origRetryCount;
+ while (true) {
+ try {
+ final InetSocketAddress addr = nameServer.lookup(id);
+ if (addr == null) {
+ throw new NullPointerException();
+ } else {
+ return addr;
+ }
+ } catch (final NullPointerException e) {
+ if (retriesLeft <= 0) {
+ throw new NamingException("Cannot find " + id + " from the name server");
+ } else {
+ final int retTimeout = LocalNameResolverImpl.this.retryTimeout
+ * (origRetryCount - retriesLeft + 1);
+ LOG.log(Level.WARNING,
+ "Caught Naming Exception while looking up " + id
+ + " with Name Server. Will retry " + retriesLeft
+ + " time(s) after waiting for " + retTimeout + " msec.");
+ Thread.sleep(retTimeout * retriesLeft);
+ --retriesLeft;
+ }
+ }
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84feeaf6/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
new file mode 100644
index 0000000..47c8700
--- /dev/null
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class LocalNameResolverTest {
+
+ private final LocalAddressProvider localAddressProvider;
+
+ public LocalNameResolverTest() throws InjectionException {
+ this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testClose() throws Exception {
+ final String localAddress = localAddressProvider.getLocalAddress();
+ final IdentifierFactory factory = new StringIdentifierFactory();
+ try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+ .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000)
+ .build()).getInstance(NameResolver.class)) {
+ final Identifier id = factory.getNewInstance("Task1");
+ resolver.register(id, new InetSocketAddress(localAddress, 7001));
+ resolver.unregister(id);
+ Thread.sleep(100);
+ }
+ }
+
+ /**
+ * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}.
+ * To check caching behavior with expireAfterAccess & expireAfterWrite
+ * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+ *
+ * @throws Exception
+ */
+ @Test
+ public final void testLookup() throws Exception {
+ final IdentifierFactory factory = new StringIdentifierFactory();
+ final String localAddress = localAddressProvider.getLocalAddress();
+ try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+ .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150)
+ .build()).getInstance(NameResolver.class)) {
+ final Identifier id = factory.getNewInstance("Task1");
+ final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 7001);
+ resolver.register(id, socketAddr);
+ InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry
+ Assert.assertTrue(socketAddr.equals(lookupAddr));
+ resolver.unregister(id);
+ Thread.sleep(100);
+ try {
+ lookupAddr = resolver.lookup(id);
+ Thread.sleep(100);
+ //With expireAfterAccess, the previous lookup would reset expiry to 150ms
+ //more and 100ms wait will not expire the item and will return the cached value
+ //With expireAfterWrite, the extra wait of 100 ms will expire the item
+ //resulting in NamingException and the test passes
+ lookupAddr = resolver.lookup(id);
+ Assert.assertNull("resolver.lookup(id)", lookupAddr);
+ } catch (final Exception e) {
+ if (e instanceof ExecutionException) {
+ Assert.assertTrue("Execution Exception cause is instanceof NamingException",
+ e.getCause() instanceof NamingException);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+}