You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:41:56 UTC
[27/49] ignite git commit: IGNITE-4386: Hadoop: implemented client
cleanup on protocol close. This closes #1327. This closes #1339.
IGNITE-4386: Hadoop: implemented client cleanup on protocol close. This closes #1327. This closes #1339.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b13841de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b13841de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b13841de
Branch: refs/heads/ignite-2.0
Commit: b13841de784a94d6a24dd6d1f934c851baad4b98
Parents: 08606bd
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 12 11:29:23 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 12 11:29:23 2016 +0300
----------------------------------------------------------------------
.../IgniteHadoopClientProtocolProvider.java | 70 ++----
.../hadoop/impl/proto/HadoopClientProtocol.java | 55 +++--
.../hadoop/mapreduce/MapReduceClient.java | 147 ++++++++++++
...opClientProtocolMultipleServersSelfTest.java | 93 +++-----
.../client/HadoopClientProtocolSelfTest.java | 228 ++++++++++---------
5 files changed, 367 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
index 1efe625..920e8b7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -23,24 +23,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.ConnectorConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientFactory;
-import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient;
import org.apache.ignite.internal.util.typedef.F;
-import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
-
/**
* Ignite Hadoop client protocol provider.
@@ -50,7 +42,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
public static final String FRAMEWORK_NAME = "ignite";
/** Clients. */
- private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, MapReduceClient> cliMap = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public ClientProtocol create(Configuration conf) throws IOException {
@@ -91,7 +83,12 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
/** {@inheritDoc} */
@Override public void close(ClientProtocol cliProto) throws IOException {
- // No-op.
+ if (cliProto instanceof HadoopClientProtocol) {
+ MapReduceClient cli = ((HadoopClientProtocol)cliProto).client();
+
+ if (cli.release())
+ cliMap.remove(cli.cluster(), cli);
+ }
}
/**
@@ -102,7 +99,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
* @return Client protocol.
* @throws IOException If failed.
*/
- private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
+ private ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
return new HadoopClientProtocol(conf, client(addr, Collections.singletonList(addr)));
}
@@ -114,45 +111,24 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
* @return Client.
* @throws IOException If failed.
*/
- private static GridClient client(String clusterName, Collection<String> addrs) throws IOException {
- try {
- IgniteInternalFuture<GridClient> fut = cliMap.get(clusterName);
-
- if (fut == null) {
- GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
- IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(clusterName, fut0);
+ @SuppressWarnings("unchecked")
+ private MapReduceClient client(String clusterName, Collection<String> addrs) throws IOException {
+ while (true) {
+ MapReduceClient cli = cliMap.get(clusterName);
- if (oldFut != null)
- return oldFut.get();
- else {
- GridClientConfiguration cliCfg = new GridClientConfiguration();
+ if (cli == null) {
+ cli = new MapReduceClient(clusterName, addrs);
- cliCfg.setProtocol(TCP);
- cliCfg.setServers(addrs);
- cliCfg.setMarshaller(new GridClientJdkMarshaller());
- cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
- cliCfg.setDaemon(true);
+ MapReduceClient oldCli = cliMap.putIfAbsent(clusterName, cli);
- try {
- GridClient cli = GridClientFactory.start(cliCfg);
-
- fut0.onDone(cli);
-
- return cli;
- }
- catch (GridClientException e) {
- fut0.onDone(e);
-
- throw new IOException("Failed to establish connection with Ignite: " + addrs, e);
- }
- }
+ if (oldCli != null)
+ cli = oldCli;
}
+
+ if (cli.acquire())
+ return cli;
else
- return fut.get();
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to establish connection with Ignite \u0441\u0434\u0433\u044b\u0435: " + addrs, e);
+ cliMap.remove(clusterName, cli);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index be2aa09..7fc0e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
@@ -78,7 +78,7 @@ public class HadoopClientProtocol implements ClientProtocol {
private final Configuration conf;
/** Ignite client. */
- private volatile GridClient cli;
+ private final MapReduceClient cli;
/** Last received version. */
private long lastVer = -1;
@@ -90,9 +90,10 @@ public class HadoopClientProtocol implements ClientProtocol {
* Constructor.
*
* @param conf Configuration.
- * @param cli Ignite client.
+ * @param cli Client.
*/
- public HadoopClientProtocol(Configuration conf, GridClient cli) {
+ public HadoopClientProtocol(Configuration conf, MapReduceClient cli) {
+ assert conf != null;
assert cli != null;
this.conf = conf;
@@ -104,7 +105,7 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
- HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
+ HadoopJobId jobID = execute(HadoopProtocolNextTaskIdTask.class);
conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
@@ -121,8 +122,8 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
- HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
- new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+ HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
+ jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
if (status == null)
throw new IOException("Failed to submit job (null status obtained): " + jobId);
@@ -157,8 +158,7 @@ public class HadoopClientProtocol implements ClientProtocol {
/** {@inheritDoc} */
@Override public void killJob(JobID jobId) throws IOException, InterruptedException {
try {
- cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
- new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+ execute(HadoopProtocolKillJobTask.class, jobId.getJtIdentifier(), jobId.getId());
}
catch (GridClientException e) {
throw new IOException("Failed to kill job: " + jobId, e);
@@ -181,11 +181,12 @@ public class HadoopClientProtocol implements ClientProtocol {
try {
Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
- HadoopProtocolTaskArguments args = delay >= 0 ?
- new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
- new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+ HadoopJobStatus status;
- HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
+ if (delay >= 0)
+ status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId(), delay);
+ else
+ status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId());
if (status == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
@@ -200,8 +201,8 @@ public class HadoopClientProtocol implements ClientProtocol {
/** {@inheritDoc} */
@Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
try {
- final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
- new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+ final HadoopCounters counters = execute(HadoopProtocolJobCountersTask.class,
+ jobId.getJtIdentifier(), jobId.getId());
if (counters == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
@@ -329,6 +330,21 @@ public class HadoopClientProtocol implements ClientProtocol {
}
/**
+ * Execute task.
+ *
+ * @param taskCls Task class.
+ * @param args Arguments.
+ * @return Result.
+ * @throws IOException If failed.
+ * @throws GridClientException If failed.
+ */
+ private <T> T execute(Class taskCls, Object... args) throws IOException, GridClientException {
+ HadoopProtocolTaskArguments args0 = args != null ? new HadoopProtocolTaskArguments(args) : null;
+
+ return cli.client().compute().execute(taskCls.getName(), args0);
+ }
+
+ /**
* Process received status update.
*
* @param status Ignite status.
@@ -351,4 +367,13 @@ public class HadoopClientProtocol implements ClientProtocol {
return HadoopUtils.status(lastStatus, conf);
}
+
+ /**
+ * Gets the GridClient data.
+ *
+ * @return The client data.
+ */
+ public MapReduceClient client() {
+ return cli;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
new file mode 100644
index 0000000..3d52176
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.mapreduce;
+
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientException;
+import org.apache.ignite.internal.client.GridClientFactory;
+import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
+
+/**
+ * Client.
+ */
+public class MapReduceClient {
+ /** Cluster name. */
+ private final String cluster;
+
+ /** Addresses. */
+ private final Collection<String> addrs;
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Usage counter. */
+ private final AtomicInteger cnt = new AtomicInteger();
+
+ /** Client. */
+ private volatile GridClient cli;
+
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster name.
+ * @param addrs Addresses.
+ */
+ public MapReduceClient(String cluster, Collection<String> addrs) {
+ this.cluster = cluster;
+ this.addrs = addrs;
+ }
+
+ /**
+ * @return Cluster name..
+ */
+ public String cluster() {
+ return cluster;
+ }
+
+ /**
+ * Gets the client.
+ *
+ * @return The client.
+ */
+ public GridClient client() throws IOException {
+ GridClient cli0 = cli;
+
+ if (cli0 == null) {
+ synchronized (mux) {
+ cli0 = cli;
+
+ if (cli0 == null) {
+ GridClientConfiguration cliCfg = new GridClientConfiguration();
+
+ cliCfg.setProtocol(TCP);
+ cliCfg.setServers(addrs);
+ cliCfg.setMarshaller(new GridClientJdkMarshaller());
+ cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
+ cliCfg.setDaemon(true);
+
+ try {
+ cli0 = GridClientFactory.start(cliCfg);
+
+ cli = cli0;
+ }
+ catch (GridClientException e) {
+ throw new IOException("Failed to establish connection with Ignite: " + addrs, e);
+ }
+ }
+ }
+ }
+
+ return cli0;
+ }
+
+ /**
+ * Increments usage count.
+ *
+ * @return {@code True} if succeeded and client can be used.
+ */
+ public boolean acquire() {
+ while (true) {
+ int cur = cnt.get();
+
+ if (cur < 0)
+ return false;
+
+ int next = cur + 1;
+
+ if (cnt.compareAndSet(cur, next))
+ return true;
+ }
+ }
+
+ /**
+ * Decrements the usages of the client and closes it if this is the last usage.
+ *
+ * @return {@code True} if client can be closed safely by the called.
+ */
+ public boolean release() {
+ int cnt0 = cnt.decrementAndGet();
+
+ assert cnt0 >= 0;
+
+ if (cnt0 == 0) {
+ if (cnt.compareAndSet(0, -1)) {
+ GridClient cli0 = cli;
+
+ if (cli0 != null)
+ cli0.close();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index 0805be1..a4b5e6a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@ -23,8 +23,8 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -40,13 +40,10 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
@@ -79,34 +76,12 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
}
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- clearClients();
- }
-
- /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
- clearClients();
-
super.afterTest();
}
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void clearConnectionMap() throws IgniteCheckedException {
- ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap =
- GridTestUtils.getFieldValue(IgniteHadoopClientProtocolProvider.class, "cliMap");
-
- for(IgniteInternalFuture<GridClient> fut : cliMap.values())
- fut.get().close();
-
- cliMap.clear();
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -117,18 +92,6 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
}
/**
- *
- */
- private void clearClients() {
- ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = GridTestUtils.getFieldValue(
- IgniteHadoopClientProtocolProvider.class,
- IgniteHadoopClientProtocolProvider.class,
- "cliMap");
-
- cliMap.clear();
- }
-
- /**
* @throws Exception If failed.
*/
private void beforeJob() throws Exception {
@@ -154,26 +117,31 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
private void checkJobSubmit(Configuration conf) throws Exception {
final Job job = Job.getInstance(conf);
- job.setJobName(JOB_NAME);
+ try {
+ job.setJobName(JOB_NAME);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(OutFormat.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(OutFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
- job.setNumReduceTasks(0);
+ job.setNumReduceTasks(0);
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+ FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- job.submit();
+ job.submit();
- job.waitForCompletion(false);
+ job.waitForCompletion(false);
- assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState();
+ assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState();
+ }
+ finally {
+ job.getCluster().close();
+ }
}
/**
@@ -197,18 +165,25 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
*/
@SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
public void testSingleAddress() throws Exception {
- // Don't use REST_PORT to test connection fails if the only this port is configured
- restPort = REST_PORT + 1;
+ try {
+ // Don't use REST_PORT to test connection fails if the only this port is configured
+ restPort = REST_PORT + 1;
- startGrids(gridCount());
+ startGrids(gridCount());
- GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- checkJobSubmit(configSingleAddress());
- return null;
- }
- },
- GridServerUnreachableException.class, "Failed to connect to any of the servers in list");
+ checkJobSubmit(configSingleAddress());
+ return null;
+ }
+ },
+ GridServerUnreachableException.class, "Failed to connect to any of the servers in list");
+ }
+ finally {
+ FileSystem fs = FileSystem.get(configSingleAddress());
+
+ fs.close();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
index 1ef7dd0..7156a3d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
@@ -50,7 +50,6 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -115,7 +114,6 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
stopAllGrids();
super.afterTestsStopped();
-// IgniteHadoopClientProtocolProvider.cliMap.clear();
}
/** {@inheritDoc} */
@@ -196,43 +194,48 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
final Job job = Job.getInstance(conf);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
+ try {
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
- job.setMapperClass(TestCountingMapper.class);
- job.setReducerClass(TestCountingReducer.class);
- job.setCombinerClass(TestCountingCombiner.class);
+ job.setMapperClass(TestCountingMapper.class);
+ job.setReducerClass(TestCountingReducer.class);
+ job.setCombinerClass(TestCountingCombiner.class);
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+ FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
- job.submit();
+ job.submit();
- final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
+ final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
- assertEquals(0, cntr.getValue());
+ assertEquals(0, cntr.getValue());
- cntr.increment(10);
+ cntr.increment(10);
- assertEquals(10, cntr.getValue());
+ assertEquals(10, cntr.getValue());
- // Transferring to map phase.
- setupLockFile.delete();
+ // Transferring to map phase.
+ setupLockFile.delete();
- // Transferring to reduce phase.
- mapLockFile.delete();
+ // Transferring to reduce phase.
+ mapLockFile.delete();
- job.waitForCompletion(false);
+ job.waitForCompletion(false);
- assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
+ assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
- final Counters counters = job.getCounters();
+ final Counters counters = job.getCounters();
- assertNotNull("counters cannot be null", counters);
- assertEquals("wrong counters count", 3, counters.countCounters());
- assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
- assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+ assertNotNull("counters cannot be null", counters);
+ assertEquals("wrong counters count", 3, counters.countCounters());
+ assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
+ assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
+ assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+ }
+ finally {
+ job.getCluster().close();
+ }
}
/**
@@ -304,114 +307,119 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
final Job job = Job.getInstance(conf);
- job.setJobName(JOB_NAME);
+ try {
+ job.setJobName(JOB_NAME);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
- job.setMapperClass(TestMapper.class);
- job.setReducerClass(TestReducer.class);
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
- if (!noCombiners)
- job.setCombinerClass(TestCombiner.class);
+ if (!noCombiners)
+ job.setCombinerClass(TestCombiner.class);
- if (noReducers)
- job.setNumReduceTasks(0);
+ if (noReducers)
+ job.setNumReduceTasks(0);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TestOutputFormat.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(TestOutputFormat.class);
- FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
- FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+ FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
- job.submit();
+ job.submit();
- JobID jobId = job.getJobID();
+ JobID jobId = job.getJobID();
- // Setup phase.
- JobStatus jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
- assert jobStatus.getMapProgress() == 0.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
+ // Setup phase.
+ JobStatus jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
+ assert jobStatus.getMapProgress() == 0.0f;
+ assert jobStatus.getReduceProgress() == 0.0f;
- U.sleep(2100);
+ U.sleep(2100);
- JobStatus recentJobStatus = job.getStatus();
+ JobStatus recentJobStatus = job.getStatus();
- assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
- "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
+ assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : "Old="
+ + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
- // Transferring to map phase.
- setupLockFile.delete();
+ // Transferring to map phase.
+ setupLockFile.delete();
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getSetupProgress());
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return F.eq(1.0f, job.getStatus().getSetupProgress());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unexpected exception.", e);
+ }
}
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
- }
- }
- }, 5000L);
+ }, 5000L);
- // Map phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
- assert jobStatus.getReduceProgress() == 0.0f;
+ // Map phase.
+ jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
+ assert jobStatus.getReduceProgress() == 0.0f;
- U.sleep(2100);
+ U.sleep(2100);
- recentJobStatus = job.getStatus();
+ recentJobStatus = job.getStatus();
- assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
- "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
+ assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : "Old=" + jobStatus.getMapProgress()
+ + ", new=" + recentJobStatus.getMapProgress();
- // Transferring to reduce phase.
- mapLockFile.delete();
+ // Transferring to reduce phase.
+ mapLockFile.delete();
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- return F.eq(1.0f, job.getStatus().getMapProgress());
- }
- catch (Exception e) {
- throw new RuntimeException("Unexpected exception.", e);
+ assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return F.eq(1.0f, job.getStatus().getMapProgress());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unexpected exception.", e);
+ }
}
- }
- }, 5000L);
+ }, 5000L);
- if (!noReducers) {
- // Reduce phase.
- jobStatus = job.getStatus();
- checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
+ if (!noReducers) {
+ // Reduce phase.
+ jobStatus = job.getStatus();
+ checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() == 1.0f;
+ assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
- // Ensure that reduces progress increases.
- U.sleep(2100);
+ // Ensure that reduces progress increases.
+ U.sleep(2100);
- recentJobStatus = job.getStatus();
+ recentJobStatus = job.getStatus();
- assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
- "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
+ assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : "Old="
+ + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
- reduceLockFile.delete();
- }
+ reduceLockFile.delete();
+ }
- job.waitForCompletion(false);
+ job.waitForCompletion(false);
- jobStatus = job.getStatus();
- checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
- assert jobStatus.getSetupProgress() == 1.0f;
- assert jobStatus.getMapProgress() == 1.0f;
- assert jobStatus.getReduceProgress() == 1.0f;
+ jobStatus = job.getStatus();
+ checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
+ assert jobStatus.getSetupProgress() == 1.0f;
+ assert jobStatus.getMapProgress() == 1.0f;
+ assert jobStatus.getReduceProgress() == 1.0f;
- dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+ dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+ }
+ finally {
+ job.getCluster().close();
+ }
}
/**
@@ -517,7 +525,12 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* Test Hadoop counters.
*/
public enum TestCounter {
- COUNTER1, COUNTER2, COUNTER3
+ /** */
+ COUNTER1,
+ /** */
+ COUNTER2,
+ /** */
+ COUNTER3
}
/**
@@ -535,6 +548,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* Test combiner that counts invocations.
*/
public static class TestCountingCombiner extends TestReducer {
+ /** {@inheritDoc} */
@Override public void reduce(Text key, Iterable<IntWritable> values,
Context ctx) throws IOException, InterruptedException {
ctx.getCounter(TestCounter.COUNTER1).increment(1);
@@ -552,6 +566,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* Test reducer that counts invocations.
*/
public static class TestCountingReducer extends TestReducer {
+ /** {@inheritDoc} */
@Override public void reduce(Text key, Iterable<IntWritable> values,
Context ctx) throws IOException, InterruptedException {
ctx.getCounter(TestCounter.COUNTER1).increment(1);
@@ -566,6 +581,9 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
// No-op.
}
+ /**
+ * Test output format.
+ */
public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
/** {@inheritDoc} */
@Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)