You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/15 09:21:09 UTC

[03/18] ignite git commit: IGNITE-4386: Hadoop: implemented client cleanup on protocol close. This closes #1327. This closes #1339.

IGNITE-4386: Hadoop: implemented client cleanup on protocol close. This closes #1327. This closes #1339.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b13841de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b13841de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b13841de

Branch: refs/heads/ignite-4371
Commit: b13841de784a94d6a24dd6d1f934c851baad4b98
Parents: 08606bd
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 12 11:29:23 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 12 11:29:23 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopClientProtocolProvider.java     |  70 ++----
 .../hadoop/impl/proto/HadoopClientProtocol.java |  55 +++--
 .../hadoop/mapreduce/MapReduceClient.java       | 147 ++++++++++++
 ...opClientProtocolMultipleServersSelfTest.java |  93 +++-----
 .../client/HadoopClientProtocolSelfTest.java    | 228 ++++++++++---------
 5 files changed, 367 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
index 1efe625..920e8b7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -23,24 +23,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.ConnectorConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientFactory;
-import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient;
 import org.apache.ignite.internal.util.typedef.F;
 
-import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
-
 
 /**
  * Ignite Hadoop client protocol provider.
@@ -50,7 +42,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
     public static final String FRAMEWORK_NAME = "ignite";
 
     /** Clients. */
-    private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, MapReduceClient> cliMap = new ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
     @Override public ClientProtocol create(Configuration conf) throws IOException {
@@ -91,7 +83,12 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
 
     /** {@inheritDoc} */
     @Override public void close(ClientProtocol cliProto) throws IOException {
-        // No-op.
+        if (cliProto instanceof HadoopClientProtocol) {
+            MapReduceClient cli = ((HadoopClientProtocol)cliProto).client();
+
+            if (cli.release())
+                cliMap.remove(cli.cluster(), cli);
+        }
     }
 
     /**
@@ -102,7 +99,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
      * @return Client protocol.
      * @throws IOException If failed.
      */
-    private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
+    private ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
         return new HadoopClientProtocol(conf, client(addr, Collections.singletonList(addr)));
     }
 
@@ -114,45 +111,24 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
      * @return Client.
      * @throws IOException If failed.
      */
-    private static GridClient client(String clusterName, Collection<String> addrs) throws IOException {
-        try {
-            IgniteInternalFuture<GridClient> fut = cliMap.get(clusterName);
-
-            if (fut == null) {
-                GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(clusterName, fut0);
+    @SuppressWarnings("unchecked")
+    private MapReduceClient client(String clusterName, Collection<String> addrs) throws IOException {
+        while (true) {
+            MapReduceClient cli = cliMap.get(clusterName);
 
-                if (oldFut != null)
-                    return oldFut.get();
-                else {
-                    GridClientConfiguration cliCfg = new GridClientConfiguration();
+            if (cli == null) {
+                cli = new MapReduceClient(clusterName, addrs);
 
-                    cliCfg.setProtocol(TCP);
-                    cliCfg.setServers(addrs);
-                    cliCfg.setMarshaller(new GridClientJdkMarshaller());
-                    cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
-                    cliCfg.setDaemon(true);
+                MapReduceClient oldCli = cliMap.putIfAbsent(clusterName, cli);
 
-                    try {
-                        GridClient cli = GridClientFactory.start(cliCfg);
-
-                        fut0.onDone(cli);
-
-                        return cli;
-                    }
-                    catch (GridClientException e) {
-                        fut0.onDone(e);
-
-                        throw new IOException("Failed to establish connection with Ignite: " + addrs, e);
-                    }
-                }
+                if (oldCli != null)
+                    cli = oldCli;
             }
+
+            if (cli.acquire())
+                return cli;
             else
-                return fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to establish connection with Ignite \u0441\u0434\u0433\u044b\u0435: " + addrs, e);
+                cliMap.remove(clusterName, cli);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index be2aa09..7fc0e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient;
 import org.apache.ignite.internal.client.GridClientException;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
@@ -78,7 +78,7 @@ public class HadoopClientProtocol implements ClientProtocol {
     private final Configuration conf;
 
     /** Ignite client. */
-    private volatile GridClient cli;
+    private final MapReduceClient cli;
 
     /** Last received version. */
     private long lastVer = -1;
@@ -90,9 +90,10 @@ public class HadoopClientProtocol implements ClientProtocol {
      * Constructor.
      *
      * @param conf Configuration.
-     * @param cli Ignite client.
+     * @param cli Client.
      */
-    public HadoopClientProtocol(Configuration conf, GridClient cli) {
+    public HadoopClientProtocol(Configuration conf, MapReduceClient cli) {
+        assert conf != null;
         assert cli != null;
 
         this.conf = conf;
@@ -104,7 +105,7 @@ public class HadoopClientProtocol implements ClientProtocol {
         try {
             conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
 
-            HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
+            HadoopJobId jobID = execute(HadoopProtocolNextTaskIdTask.class);
 
             conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
 
@@ -121,8 +122,8 @@ public class HadoopClientProtocol implements ClientProtocol {
         try {
             conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
 
-            HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+            HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
+                jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf));
 
             if (status == null)
                 throw new IOException("Failed to submit job (null status obtained): " + jobId);
@@ -157,8 +158,7 @@ public class HadoopClientProtocol implements ClientProtocol {
     /** {@inheritDoc} */
     @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
         try {
-            cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+            execute(HadoopProtocolKillJobTask.class, jobId.getJtIdentifier(), jobId.getId());
         }
         catch (GridClientException e) {
             throw new IOException("Failed to kill job: " + jobId, e);
@@ -181,11 +181,12 @@ public class HadoopClientProtocol implements ClientProtocol {
         try {
             Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
 
-            HadoopProtocolTaskArguments args = delay >= 0 ?
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+            HadoopJobStatus status;
 
-            HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
+            if (delay >= 0)
+                status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId(), delay);
+            else
+                status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId());
 
             if (status == null)
                 throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
@@ -200,8 +201,8 @@ public class HadoopClientProtocol implements ClientProtocol {
     /** {@inheritDoc} */
     @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
         try {
-            final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+            final HadoopCounters counters = execute(HadoopProtocolJobCountersTask.class,
+                jobId.getJtIdentifier(), jobId.getId());
 
             if (counters == null)
                 throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
@@ -329,6 +330,21 @@ public class HadoopClientProtocol implements ClientProtocol {
     }
 
     /**
+     * Execute task.
+     *
+     * @param taskCls Task class.
+     * @param args Arguments.
+     * @return Result.
+     * @throws IOException If failed.
+     * @throws GridClientException If failed.
+     */
+    private <T> T execute(Class taskCls, Object... args) throws IOException, GridClientException {
+        HadoopProtocolTaskArguments args0 = args != null ? new HadoopProtocolTaskArguments(args) : null;
+
+        return cli.client().compute().execute(taskCls.getName(), args0);
+    }
+
+    /**
      * Process received status update.
      *
      * @param status Ignite status.
@@ -351,4 +367,13 @@ public class HadoopClientProtocol implements ClientProtocol {
 
         return HadoopUtils.status(lastStatus, conf);
     }
+
+    /**
+     * Gets the GridClient data.
+     *
+     * @return The client data.
+     */
+    public MapReduceClient client() {
+        return cli;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
new file mode 100644
index 0000000..3d52176
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.mapreduce;
+
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientException;
+import org.apache.ignite.internal.client.GridClientFactory;
+import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
+
+/**
+ * Client.
+ */
+public class MapReduceClient {
+    /** Cluster name. */
+    private final String cluster;
+
+    /** Addresses. */
+    private final Collection<String> addrs;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Usage counter. */
+    private final AtomicInteger cnt = new AtomicInteger();
+
+    /** Client. */
+    private volatile GridClient cli;
+
+    /**
+     * Constructor.
+     *
+     * @param cluster Cluster name.
+     * @param addrs Addresses.
+     */
+    public MapReduceClient(String cluster, Collection<String> addrs) {
+        this.cluster = cluster;
+        this.addrs = addrs;
+    }
+
+    /**
+     * @return Cluster name..
+     */
+    public String cluster() {
+        return cluster;
+    }
+
+    /**
+     * Gets the client.
+     *
+     * @return The client.
+     */
+    public GridClient client() throws IOException {
+        GridClient cli0 = cli;
+
+        if (cli0 == null) {
+            synchronized (mux) {
+                cli0 = cli;
+
+                if (cli0 == null) {
+                    GridClientConfiguration cliCfg = new GridClientConfiguration();
+
+                    cliCfg.setProtocol(TCP);
+                    cliCfg.setServers(addrs);
+                    cliCfg.setMarshaller(new GridClientJdkMarshaller());
+                    cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
+                    cliCfg.setDaemon(true);
+
+                    try {
+                        cli0 = GridClientFactory.start(cliCfg);
+
+                        cli = cli0;
+                    }
+                    catch (GridClientException e) {
+                        throw new IOException("Failed to establish connection with Ignite: " + addrs, e);
+                    }
+                }
+            }
+        }
+
+        return cli0;
+    }
+
+    /**
+     * Increments usage count.
+     *
+     * @return {@code True} if succeeded and client can be used.
+     */
+    public boolean acquire() {
+        while (true) {
+            int cur = cnt.get();
+
+            if (cur < 0)
+                return false;
+
+            int next = cur + 1;
+
+            if (cnt.compareAndSet(cur, next))
+                return true;
+        }
+    }
+
+    /**
+     * Decrements the usages of the client and closes it if this is the last usage.
+     *
+     * @return {@code True} if client can be closed safely by the called.
+     */
+    public boolean release() {
+        int cnt0 = cnt.decrementAndGet();
+
+        assert cnt0 >= 0;
+
+        if (cnt0 == 0) {
+            if (cnt.compareAndSet(0, -1)) {
+                GridClient cli0 = cli;
+
+                if (cli0 != null)
+                    cli0.close();
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index 0805be1..a4b5e6a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@ -23,8 +23,8 @@ import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -40,13 +40,10 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.client.GridServerUnreachableException;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
@@ -79,34 +76,12 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        clearClients();
-    }
-
-    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
-        clearClients();
-
         super.afterTest();
     }
 
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    private void clearConnectionMap() throws IgniteCheckedException {
-        ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap =
-            GridTestUtils.getFieldValue(IgniteHadoopClientProtocolProvider.class, "cliMap");
-
-        for(IgniteInternalFuture<GridClient> fut : cliMap.values())
-            fut.get().close();
-
-        cliMap.clear();
-    }
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -117,18 +92,6 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
     }
 
     /**
-     *
-     */
-    private void clearClients() {
-        ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = GridTestUtils.getFieldValue(
-            IgniteHadoopClientProtocolProvider.class,
-            IgniteHadoopClientProtocolProvider.class,
-            "cliMap");
-
-        cliMap.clear();
-    }
-
-    /**
      * @throws Exception If failed.
      */
     private void beforeJob() throws Exception {
@@ -154,26 +117,31 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
     private void checkJobSubmit(Configuration conf) throws Exception {
         final Job job = Job.getInstance(conf);
 
-        job.setJobName(JOB_NAME);
+        try {
+            job.setJobName(JOB_NAME);
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
 
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setOutputFormatClass(OutFormat.class);
+            job.setInputFormatClass(TextInputFormat.class);
+            job.setOutputFormatClass(OutFormat.class);
 
-        job.setMapperClass(TestMapper.class);
-        job.setReducerClass(TestReducer.class);
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
 
-        job.setNumReduceTasks(0);
+            job.setNumReduceTasks(0);
 
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+            FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
 
-        job.submit();
+            job.submit();
 
-        job.waitForCompletion(false);
+            job.waitForCompletion(false);
 
-        assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState();
+            assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState();
+        }
+        finally {
+            job.getCluster().close();
+        }
     }
 
     /**
@@ -197,18 +165,25 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
      */
     @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     public void testSingleAddress() throws Exception {
-        // Don't use REST_PORT to test connection fails if the only this port is configured
-        restPort = REST_PORT + 1;
+        try {
+            // Don't use REST_PORT to test connection fails if the only this port is configured
+            restPort = REST_PORT + 1;
 
-        startGrids(gridCount());
+            startGrids(gridCount());
 
-        GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
+            GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    checkJobSubmit(configSingleAddress());
-                    return null;
-                }
-            },
-            GridServerUnreachableException.class, "Failed to connect to any of the servers in list");
+                        checkJobSubmit(configSingleAddress());
+                        return null;
+                    }
+                },
+                GridServerUnreachableException.class, "Failed to connect to any of the servers in list");
+        }
+        finally {
+            FileSystem fs = FileSystem.get(configSingleAddress());
+
+            fs.close();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13841de/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
index 1ef7dd0..7156a3d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
@@ -50,7 +50,6 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -115,7 +114,6 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
         stopAllGrids();
 
         super.afterTestsStopped();
-//        IgniteHadoopClientProtocolProvider.cliMap.clear();
     }
 
     /** {@inheritDoc} */
@@ -196,43 +194,48 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
 
         final Job job = Job.getInstance(conf);
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
+        try {
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
 
-        job.setMapperClass(TestCountingMapper.class);
-        job.setReducerClass(TestCountingReducer.class);
-        job.setCombinerClass(TestCountingCombiner.class);
+            job.setMapperClass(TestCountingMapper.class);
+            job.setReducerClass(TestCountingReducer.class);
+            job.setCombinerClass(TestCountingCombiner.class);
 
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+            FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+            FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
 
-        job.submit();
+            job.submit();
 
-        final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
+            final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
 
-        assertEquals(0, cntr.getValue());
+            assertEquals(0, cntr.getValue());
 
-        cntr.increment(10);
+            cntr.increment(10);
 
-        assertEquals(10, cntr.getValue());
+            assertEquals(10, cntr.getValue());
 
-        // Transferring to map phase.
-        setupLockFile.delete();
+            // Transferring to map phase.
+            setupLockFile.delete();
 
-        // Transferring to reduce phase.
-        mapLockFile.delete();
+            // Transferring to reduce phase.
+            mapLockFile.delete();
 
-        job.waitForCompletion(false);
+            job.waitForCompletion(false);
 
-        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
+            assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
 
-        final Counters counters = job.getCounters();
+            final Counters counters = job.getCounters();
 
-        assertNotNull("counters cannot be null", counters);
-        assertEquals("wrong counters count", 3, counters.countCounters());
-        assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+            assertNotNull("counters cannot be null", counters);
+            assertEquals("wrong counters count", 3, counters.countCounters());
+            assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
+            assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
+            assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+        }
+        finally {
+            job.getCluster().close();
+        }
     }
 
     /**
@@ -304,114 +307,119 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
 
         final Job job = Job.getInstance(conf);
 
-        job.setJobName(JOB_NAME);
+        try {
+            job.setJobName(JOB_NAME);
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
 
-        job.setMapperClass(TestMapper.class);
-        job.setReducerClass(TestReducer.class);
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
 
-        if (!noCombiners)
-            job.setCombinerClass(TestCombiner.class);
+            if (!noCombiners)
+                job.setCombinerClass(TestCombiner.class);
 
-        if (noReducers)
-            job.setNumReduceTasks(0);
+            if (noReducers)
+                job.setNumReduceTasks(0);
 
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setOutputFormatClass(TestOutputFormat.class);
+            job.setInputFormatClass(TextInputFormat.class);
+            job.setOutputFormatClass(TestOutputFormat.class);
 
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+            FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+            FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
 
-        job.submit();
+            job.submit();
 
-        JobID jobId = job.getJobID();
+            JobID jobId = job.getJobID();
 
-        // Setup phase.
-        JobStatus jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
-        assert jobStatus.getMapProgress() == 0.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
+            // Setup phase.
+            JobStatus jobStatus = job.getStatus();
+            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+            assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
+            assert jobStatus.getMapProgress() == 0.0f;
+            assert jobStatus.getReduceProgress() == 0.0f;
 
-        U.sleep(2100);
+            U.sleep(2100);
 
-        JobStatus recentJobStatus = job.getStatus();
+            JobStatus recentJobStatus = job.getStatus();
 
-        assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
-            "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
+            assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : "Old="
+                + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
 
-        // Transferring to map phase.
-        setupLockFile.delete();
+            // Transferring to map phase.
+            setupLockFile.delete();
 
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getSetupProgress());
+            assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        return F.eq(1.0f, job.getStatus().getSetupProgress());
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException("Unexpected exception.", e);
+                    }
                 }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
-                }
-            }
-        }, 5000L);
+            }, 5000L);
 
-        // Map phase.
-        jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
+            // Map phase.
+            jobStatus = job.getStatus();
+            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+            assert jobStatus.getSetupProgress() == 1.0f;
+            assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
+            assert jobStatus.getReduceProgress() == 0.0f;
 
-        U.sleep(2100);
+            U.sleep(2100);
 
-        recentJobStatus = job.getStatus();
+            recentJobStatus = job.getStatus();
 
-        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
-            "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
+            assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : "Old=" + jobStatus.getMapProgress()
+                + ", new=" + recentJobStatus.getMapProgress();
 
-        // Transferring to reduce phase.
-        mapLockFile.delete();
+            // Transferring to reduce phase.
+            mapLockFile.delete();
 
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getMapProgress());
-                }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
+            assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        return F.eq(1.0f, job.getStatus().getMapProgress());
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException("Unexpected exception.", e);
+                    }
                 }
-            }
-        }, 5000L);
+            }, 5000L);
 
-        if (!noReducers) {
-            // Reduce phase.
-            jobStatus = job.getStatus();
-            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-            assert jobStatus.getSetupProgress() == 1.0f;
-            assert jobStatus.getMapProgress() == 1.0f;
-            assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
+            if (!noReducers) {
+                // Reduce phase.
+                jobStatus = job.getStatus();
+                checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+                assert jobStatus.getSetupProgress() == 1.0f;
+                assert jobStatus.getMapProgress() == 1.0f;
+                assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
 
-            // Ensure that reduces progress increases.
-            U.sleep(2100);
+                // Ensure that reduces progress increases.
+                U.sleep(2100);
 
-            recentJobStatus = job.getStatus();
+                recentJobStatus = job.getStatus();
 
-            assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
-                "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
+                assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : "Old="
+                    + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
 
-            reduceLockFile.delete();
-        }
+                reduceLockFile.delete();
+            }
 
-        job.waitForCompletion(false);
+            job.waitForCompletion(false);
 
-        jobStatus = job.getStatus();
-        checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() == 1.0f;
-        assert jobStatus.getReduceProgress() == 1.0f;
+            jobStatus = job.getStatus();
+            checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
+            assert jobStatus.getSetupProgress() == 1.0f;
+            assert jobStatus.getMapProgress() == 1.0f;
+            assert jobStatus.getReduceProgress() == 1.0f;
 
-        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+            dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+        }
+        finally {
+            job.getCluster().close();
+        }
     }
 
     /**
@@ -517,7 +525,12 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * Test Hadoop counters.
      */
     public enum TestCounter {
-        COUNTER1, COUNTER2, COUNTER3
+        /** */
+        COUNTER1,
+        /** */
+        COUNTER2,
+        /** */
+        COUNTER3
     }
 
     /**
@@ -535,6 +548,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * Test combiner that counts invocations.
      */
     public static class TestCountingCombiner extends TestReducer {
+        /** {@inheritDoc} */
         @Override public void reduce(Text key, Iterable<IntWritable> values,
             Context ctx) throws IOException, InterruptedException {
             ctx.getCounter(TestCounter.COUNTER1).increment(1);
@@ -552,6 +566,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * Test reducer that counts invocations.
      */
     public static class TestCountingReducer extends TestReducer {
+        /** {@inheritDoc} */
         @Override public void reduce(Text key, Iterable<IntWritable> values,
             Context ctx) throws IOException, InterruptedException {
             ctx.getCounter(TestCounter.COUNTER1).increment(1);
@@ -566,6 +581,9 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
         // No-op.
     }
 
+    /**
+     * Test output format.
+     */
     public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
         /** {@inheritDoc} */
         @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)