You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jg...@apache.org on 2015/07/29 06:03:48 UTC

hadoop git commit: HDFS-8180. AbstractFileSystem Implementation for WebHdfs. Contributed by Sathosh G Nayak.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 69b095730 -> 0712a8103


HDFS-8180. AbstractFileSystem Implementation for WebHdfs. Contributed by Sathosh G Nayak.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0712a810
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0712a810
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0712a810

Branch: refs/heads/trunk
Commit: 0712a8103fec6e9a9ceba335e3c3800b85b2c7ca
Parents: 69b0957
Author: Jakob Homan <jg...@gmail.com>
Authored: Tue Jul 28 21:03:31 2015 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Tue Jul 28 21:03:31 2015 -0700

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  12 ++
 .../fs/FileContextMainOperationsBaseTest.java   |   4 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../java/org/apache/hadoop/fs/SWebHdfs.java     |  51 ++++++
 .../main/java/org/apache/hadoop/fs/WebHdfs.java |  51 ++++++
 .../main/java/org/apache/hadoop/fs/package.html |  26 +++
 .../TestSWebHdfsFileContextMainOperations.java  | 110 +++++++++++++
 .../TestWebHdfsFileContextMainOperations.java   | 157 +++++++++++++++++++
 8 files changed, 411 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 13702ee..bfdd453 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -596,6 +596,18 @@ for ldap providers in the same way as above does.
 </property>
 
 <property>
+  <name>fs.AbstractFileSystem.webhdfs.impl</name>
+  <value>org.apache.hadoop.fs.WebHdfs</value>
+  <description>The FileSystem for webhdfs: uris.</description>
+</property>
+
+<property>
+  <name>fs.AbstractFileSystem.swebhdfs.impl</name>
+  <value>org.apache.hadoop.fs.SWebHdfs</value>
+  <description>The FileSystem for swebhdfs: uris.</description>
+</property>
+
+<property>
   <name>fs.ftp.host</name>
   <value>0.0.0.0</value>
   <description>FTP filesystem connects to this server</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java
index e872176..12ec375 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java
@@ -1249,7 +1249,7 @@ public abstract class FileContextMainOperationsBaseTest  {
     byte[] bb = new byte[(int)len];
     FSDataInputStream fsdis = fc.open(path);
     try {
-      fsdis.read(bb);
+      fsdis.readFully(bb);
     } finally {
       fsdis.close();
     }
@@ -1310,7 +1310,7 @@ public abstract class FileContextMainOperationsBaseTest  {
     byte[] bb = new byte[data.length];
     FSDataInputStream fsdis = fc.open(path);
     try {
-      fsdis.read(bb);
+      fsdis.readFully(bb);
     } finally {
       fsdis.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9b2de81..ef12720 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -755,6 +755,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-7858. Improve HA Namenode Failover detection on the client. (asuresh)
 
+    HDFS-8180. AbstractFileSystem Implementation for WebHdfs. (snayak via jghoman)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/SWebHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/SWebHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/SWebHdfs.java
new file mode 100644
index 0000000..cd36393
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/SWebHdfs.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * AbstractFileSystem implementation for HDFS over the web (secure).
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class SWebHdfs extends DelegateToFileSystem {
+
+  public static final String SCHEME = "swebhdfs";
+
+  /**
+   * This constructor has the signature needed by
+   * {@link AbstractFileSystem#createFileSystem(URI, Configuration)}
+   *
+   * @param theUri which must be that of swebhdfs
+   * @param conf   configuration
+   * @throws IOException
+   */
+  SWebHdfs(URI theUri, Configuration conf)
+      throws IOException, URISyntaxException {
+    super(theUri, new SWebHdfsFileSystem(), conf, SCHEME, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/WebHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/WebHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/WebHdfs.java
new file mode 100644
index 0000000..dc4f6d5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/WebHdfs.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * AbstractFileSystem implementation for HDFS over the web.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class WebHdfs extends DelegateToFileSystem {
+
+  public static final String SCHEME = "webhdfs";
+
+  /**
+   * This constructor has the signature needed by
+   * {@link AbstractFileSystem#createFileSystem(URI, Configuration)}
+   *
+   * @param theUri which must be that of webhdfs
+   * @param conf   configuration
+   * @throws IOException
+   */
+  WebHdfs(URI theUri, Configuration conf)
+      throws IOException, URISyntaxException {
+    super(theUri, new WebHdfsFileSystem(), conf, SCHEME, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/package.html
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/package.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/package.html
new file mode 100644
index 0000000..53b2a5a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/package.html
@@ -0,0 +1,26 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>Implementations of {@link org.apache.hadoop.fs.AbstractFileSystem} for hdfs
+    over rpc and hdfs over web.</p>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSWebHdfsFileContextMainOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSWebHdfsFileContextMainOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSWebHdfsFileContextMainOperations.java
new file mode 100644
index 0000000..874abd6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSWebHdfsFileContextMainOperations.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static org.apache.hadoop.fs.FileContextTestHelper.getDefaultBlockSize;
+import static org.apache.hadoop.fs.FileContextTestHelper.getFileData;
+
+/**
+ * Test of FileContext apis on SWebhdfs.
+ */
+public class TestSWebHdfsFileContextMainOperations
+    extends TestWebHdfsFileContextMainOperations {
+
+  private static MiniDFSCluster cluster;
+  private static Path defaultWorkingDirectory;
+  private static String keystoresDir;
+  private static String sslConfDir;
+  protected static URI webhdfsUrl;
+
+  private static final HdfsConfiguration CONF = new HdfsConfiguration();
+
+  private static final String BASEDIR =
+      System.getProperty("test.build.dir", "target/test-dir") + "/"
+          + TestSWebHdfsFileContextMainOperations.class.getSimpleName();
+  protected static int numBlocks = 2;
+  protected static final byte[] data = getFileData(numBlocks,
+      getDefaultBlockSize());
+
+  private static Configuration sslConf;
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning()
+      throws IOException, LoginException, URISyntaxException {
+
+    File base = new File(BASEDIR);
+    FileUtil.fullyDelete(base);
+    base.mkdirs();
+    keystoresDir = new File(BASEDIR).getAbsolutePath();
+    sslConf = new Configuration();
+
+    try {
+      sslConfDir = KeyStoreTestUtil
+          .getClasspathDir(TestSWebHdfsFileContextMainOperations.class);
+      KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, sslConf, false);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    CONF.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, "HTTPS_ONLY");
+    CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    CONF.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    CONF.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT_AND_LOCALHOST");
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
+
+    cluster.waitClusterUp();
+    webhdfsUrl = new URI(SWebHdfs.SCHEME + "://" + cluster.getConfiguration(0)
+        .get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY));
+
+    fc = FileContext.getFileContext(webhdfsUrl, CONF);
+    defaultWorkingDirectory = fc.makeQualified(new Path(
+        "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
+    fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
+
+  }
+
+  @Override
+  public URI getWebhdfsUrl() {
+    return webhdfsUrl;
+  }
+
+  @AfterClass
+  public static void ClusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    FileUtil.fullyDelete(new File(BASEDIR));
+    KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0712a810/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestWebHdfsFileContextMainOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestWebHdfsFileContextMainOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestWebHdfsFileContextMainOperations.java
new file mode 100644
index 0000000..c4bf0ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestWebHdfsFileContextMainOperations.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.FileContextTestHelper.getDefaultBlockSize;
+import static org.apache.hadoop.fs.FileContextTestHelper.getFileData;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of FileContext apis on Webhdfs.
+ */
+public class TestWebHdfsFileContextMainOperations
+    extends FileContextMainOperationsBaseTest {
+
+  protected static MiniDFSCluster cluster;
+  private static Path defaultWorkingDirectory;
+  protected static URI webhdfsUrl;
+
+  protected static int numBlocks = 2;
+
+  protected static final byte[] data = getFileData(numBlocks,
+      getDefaultBlockSize());
+  protected static final HdfsConfiguration CONF = new HdfsConfiguration();
+
+  @Override
+  public Path getDefaultWorkingDirectory() {
+    return defaultWorkingDirectory;
+  }
+
+  public URI getWebhdfsUrl() {
+    return webhdfsUrl;
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning()
+      throws IOException, LoginException, URISyntaxException {
+
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
+    cluster.waitClusterUp();
+    webhdfsUrl = new URI(WebHdfs.SCHEME + "://" + cluster.getConfiguration(0)
+        .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY));
+    fc = FileContext.getFileContext(webhdfsUrl, CONF);
+    defaultWorkingDirectory = fc.makeQualified(new Path(
+        "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
+    fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    URI webhdfsUrlReal = getWebhdfsUrl();
+    Path testBuildData = new Path(
+        webhdfsUrlReal + "/build/test/data/" + RandomStringUtils
+            .randomAlphanumeric(10));
+    Path rootPath = new Path(testBuildData, "root-uri");
+
+    localFsRootPath = rootPath.makeQualified(webhdfsUrlReal, null);
+    fc.mkdir(getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
+  }
+
+  private Path getTestRootPath(FileContext fc, String path) {
+    return fileContextTestHelper.getTestRootPath(fc, path);
+  }
+
+  @Override
+  protected boolean listCorruptedBlocksSupported() {
+    return false;
+  }
+
+  /**
+   * Test FileContext APIs when symlinks are not supported
+   * TODO: Open separate JIRA for full support of the Symlink in webhdfs
+   */
+  @Test
+  public void testUnsupportedSymlink() throws IOException {
+    /**
+     * WebHdfs client Partially supports the Symlink.
+     * creation of Symlink is supported, but the getLinkTargetPath() api is not supported currently,
+     * Implement the test case once the full support is available.
+     */
+  }
+
+  /**
+   * TODO: Open JIRA for the idiosyncrasies between hdfs and webhdfs
+   */
+  public void testSetVerifyChecksum() throws IOException {
+    final Path rootPath = getTestRootPath(fc, "test");
+    final Path path = new Path(rootPath, "zoo");
+
+    FSDataOutputStream out = fc
+        .create(path, EnumSet.of(CREATE), Options.CreateOpts.createParent());
+    try {
+      out.write(data, 0, data.length);
+    } finally {
+      out.close();
+    }
+
+    //In webhdfs scheme fc.setVerifyChecksum() can be called only after
+    // writing first few bytes but in case of the hdfs scheme we can call
+    // immediately after the creation call.
+    // instruct FS to verify checksum through the FileContext:
+    fc.setVerifyChecksum(true, path);
+
+    FileStatus fileStatus = fc.getFileStatus(path);
+    final long len = fileStatus.getLen();
+    assertTrue(len == data.length);
+    byte[] bb = new byte[(int) len];
+    FSDataInputStream fsdis = fc.open(path);
+    try {
+      fsdis.readFully(bb);
+    } finally {
+      fsdis.close();
+    }
+    assertArrayEquals(data, bb);
+  }
+
+  @AfterClass
+  public static void ClusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+}