You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2015/08/25 08:58:41 UTC

metamodel git commit: METAMODEL-179: Fixed

Repository: metamodel
Updated Branches:
  refs/heads/master fc2752abd -> 226b004a4


METAMODEL-179: Fixed

Fixes #44

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/226b004a
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/226b004a
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/226b004a

Branch: refs/heads/master
Commit: 226b004a4a99f6551bcde06bdc13661938035745
Parents: fc2752a
Author: Tomasz Guzialek <to...@apache.org>
Authored: Tue Aug 25 08:58:35 2015 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Tue Aug 25 08:58:35 2015 +0200

----------------------------------------------------------------------
 CHANGES.md                                      |   1 +
 .../org/apache/metamodel/util/HdfsResource.java |   2 +-
 .../util/HdfsResourceIntegrationTest.java       | 427 +++++++++++--------
 3 files changed, 242 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index f72768a..d3e6141 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -4,6 +4,7 @@
  * [METAMODEL-176] - Trimmed the transient dependencies of the JDBC module.
  * [METAMODEL-170] - Dropped support for Java 6.
  * [METAMODEL-178] - Added AggregateFunction and ScalarFunction interfaces. Changed FunctionType enum to be super-interface of those. Compatibility is retained but a recompile of code using FunctionType is needed.
+ * [METAMODEL-179] - Ensured that HdfsResource is not closing a shared HDFS file system reference.
  * [METAMODEL-171] - Made integration tests for Cassandra module function properly in all environments.
 
 ### Apache MetaModel 4.3.6

http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
index 316cef8..e9106ba 100644
--- a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
+++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
@@ -351,7 +351,7 @@ public class HdfsResource extends AbstractResource implements Serializable {
 
     public FileSystem getHadoopFileSystem() {
         try {
-            return FileSystem.get(getHadoopConfiguration());
+            return FileSystem.newInstance(getHadoopConfiguration());
         } catch (IOException e) {
             throw new MetaModelException("Could not connect to HDFS: " + e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
index 03a5610..48f9caa 100644
--- a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
+++ b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
@@ -1,187 +1,240 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.util;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Stopwatch;
-
-
-public class HdfsResourceIntegrationTest {
-    
-    private static final Logger logger = LoggerFactory.getLogger(HdfsResourceIntegrationTest.class);
-
-    private String _filePath;
-    private String _hostname;
-    private int _port;
-
-    @Before
-    public void setUp() throws Exception {
-        final File file = new File(getPropertyFilePath());
-        final boolean configured;
-        if (file.exists()) {
-            final Properties properties = new Properties();
-            properties.load(new FileReader(file));
-            _filePath = properties.getProperty("hadoop.hdfs.file.path");
-            _hostname = properties.getProperty("hadoop.hdfs.hostname");
-            final String portString = properties.getProperty("hadoop.hdfs.port");
-            configured = _filePath != null && _hostname != null && portString != null;
-            if (configured) {
-                _port = Integer.parseInt(portString);
-            }
-        } else {
-            configured = false;
-        }
-        Assume.assumeTrue(configured);
-    }
-
-    private String getPropertyFilePath() {
-        String userHome = System.getProperty("user.home");
-        return userHome + "/metamodel-integrationtest-configuration.properties";
-    }
-
-    @Test
-    public void testReadDirectory() throws Exception {
-        final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
-        final String[] contents = new String[] { "fun ", "and ", "games ", "with ", "Apache ", "MetaModel ", "and ", "Hadoop ", "is ", "what ", "we ", "do" };
-
-        final Configuration conf = new Configuration();
-        conf.set("fs.defaultFS", "hdfs://" + _hostname + ":" + _port);
-        final FileSystem fileSystem = FileSystem.get(conf);
-        final Path path = new Path(_filePath);
-        final boolean exists = fileSystem.exists(path);
-
-        if(exists){
-            fileSystem.delete(path, true);
-        }
-
-        fileSystem.mkdirs(path);
-
-
-        // Reverse both filename and contents to make sure it is the name and not the creation order that is sorted on.
-        int i = contents.length;
-        Collections.reverse(Arrays.asList(contents));
-        for(final String contentPart : contents){
-            final HdfsResource partResource = new HdfsResource(_hostname, _port, _filePath + "/part-" + String.format("%02d", i--));
-            partResource.write(new Action<OutputStream>() {
-                @Override
-                public void run(OutputStream out) throws Exception {
-                    out.write(contentPart.getBytes());
-                }
-            });
-        }
-
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-
-        final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
-
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
-
-        final String str1 = res1.read(new Func<InputStream, String>() {
-            @Override
-            public String eval(InputStream in) {
-                return FileHelper.readInputStreamAsString(in, "UTF8");
-            }
-        });
-
-        Assert.assertEquals(contentString, str1);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
-
-        final String str2 = res1.read(new Func<InputStream, String>() {
-            @Override
-            public String eval(InputStream in) {
-                return FileHelper.readInputStreamAsString(in, "UTF8");
-            }
-        });
-        Assert.assertEquals(str1, str2);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
-
-        res1.getHadoopFileSystem().delete(res1.getHadoopPath(), true);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
-
-        Assert.assertFalse(res1.isExists());
-
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
-        stopwatch.stop();
-    }
-
-    @Test
-    public void testReadOnRealHdfsInstall() throws Exception {
-        final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
-
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
-
-        res1.write(new Action<OutputStream>() {
-            @Override
-            public void run(OutputStream out) throws Exception {
-                out.write(contentString.getBytes());
-            }
-        });
-
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - written");
-
-        Assert.assertTrue(res1.isExists());
-
-        final String str1 = res1.read(new Func<InputStream, String>() {
-            @Override
-            public String eval(InputStream in) {
-                return FileHelper.readInputStreamAsString(in, "UTF8");
-            }
-        });
-        Assert.assertEquals(contentString, str1);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
-
-        final String str2 = res1.read(new Func<InputStream, String>() {
-            @Override
-            public String eval(InputStream in) {
-                return FileHelper.readInputStreamAsString(in, "UTF8");
-            }
-        });
-        Assert.assertEquals(str1, str2);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
-
-        res1.getHadoopFileSystem().delete(res1.getHadoopPath(), false);
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
-
-        Assert.assertFalse(res1.isExists());
-
-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
-        stopwatch.stop();
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.util;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+
+public class HdfsResourceIntegrationTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(HdfsResourceIntegrationTest.class);
+
+    private String _filePath;
+    private String _hostname;
+    private int _port;
+
+    @Before
+    public void setUp() throws Exception {
+        final File file = new File(getPropertyFilePath());
+        final boolean configured;
+        if (file.exists()) {
+            final Properties properties = new Properties();
+            properties.load(new FileReader(file));
+            _filePath = properties.getProperty("hadoop.hdfs.file.path");
+            _hostname = properties.getProperty("hadoop.hdfs.hostname");
+            final String portString = properties.getProperty("hadoop.hdfs.port");
+            configured = _filePath != null && _hostname != null && portString != null;
+            if (configured) {
+                _port = Integer.parseInt(portString);
+            }
+        } else {
+            configured = false;
+        }
+        Assume.assumeTrue(configured);
+    }
+
+    private String getPropertyFilePath() {
+        String userHome = System.getProperty("user.home");
+        return userHome + "/metamodel-integrationtest-configuration.properties";
+    }
+
+    @Test
+    public void testReadDirectory() throws Exception {
+        final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
+        final String[] contents = new String[] { "fun ", "and ", "games ", "with ", "Apache ", "MetaModel ", "and ",
+                "Hadoop ", "is ", "what ", "we ", "do" };
+
+        final Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", "hdfs://" + _hostname + ":" + _port);
+        final FileSystem fileSystem = FileSystem.get(conf);
+        final Path path = new Path(_filePath);
+        final boolean exists = fileSystem.exists(path);
+
+        if (exists) {
+            fileSystem.delete(path, true);
+        }
+
+        fileSystem.mkdirs(path);
+
+        // Reverse both filename and contents to make sure it is the name and
+        // not the creation order that is sorted on.
+        int i = contents.length;
+        Collections.reverse(Arrays.asList(contents));
+        for (final String contentPart : contents) {
+            final HdfsResource partResource = new HdfsResource(_hostname, _port, _filePath + "/part-"
+                    + String.format("%02d", i--));
+            partResource.write(new Action<OutputStream>() {
+                @Override
+                public void run(OutputStream out) throws Exception {
+                    out.write(contentPart.getBytes());
+                }
+            });
+        }
+
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+
+        final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
+
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
+
+        final String str1 = res1.read(new Func<InputStream, String>() {
+            @Override
+            public String eval(InputStream in) {
+                return FileHelper.readInputStreamAsString(in, "UTF8");
+            }
+        });
+
+        Assert.assertEquals(contentString, str1);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
+
+        final String str2 = res1.read(new Func<InputStream, String>() {
+            @Override
+            public String eval(InputStream in) {
+                return FileHelper.readInputStreamAsString(in, "UTF8");
+            }
+        });
+        Assert.assertEquals(str1, str2);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
+
+        res1.getHadoopFileSystem().delete(res1.getHadoopPath(), true);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
+
+        Assert.assertFalse(res1.isExists());
+
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
+        stopwatch.stop();
+    }
+
+    @Test
+    public void testReadOnRealHdfsInstall() throws Exception {
+        final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
+
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
+
+        res1.write(new Action<OutputStream>() {
+            @Override
+            public void run(OutputStream out) throws Exception {
+                out.write(contentString.getBytes());
+            }
+        });
+
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - written");
+
+        Assert.assertTrue(res1.isExists());
+
+        final String str1 = res1.read(new Func<InputStream, String>() {
+            @Override
+            public String eval(InputStream in) {
+                return FileHelper.readInputStreamAsString(in, "UTF8");
+            }
+        });
+        Assert.assertEquals(contentString, str1);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
+
+        final String str2 = res1.read(new Func<InputStream, String>() {
+            @Override
+            public String eval(InputStream in) {
+                return FileHelper.readInputStreamAsString(in, "UTF8");
+            }
+        });
+        Assert.assertEquals(str1, str2);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
+
+        res1.getHadoopFileSystem().delete(res1.getHadoopPath(), false);
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
+
+        Assert.assertFalse(res1.isExists());
+
+        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
+        stopwatch.stop();
+    }
+
+    @Test
+    public void testFileSystemNotBeingClosed() throws IOException {
+        HdfsResource resourceToRead = null;
+        try {
+            resourceToRead = new HdfsResource(_hostname, _port, _filePath);
+            resourceToRead.write(new Action<OutputStream>() {
+
+                @Override
+                public void run(OutputStream out) throws Exception {
+                    FileHelper.writeString(out, "testFileSystemNotBeingClosed");
+                }
+            });
+
+            Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
+                public void uncaughtException(Thread th, Throwable ex) {
+                    Assert.fail("Caught exception in the thread: " + ex);
+                }
+            };
+
+            for (int i = 0; i < 10; i++) {
+                final HdfsResource res = new HdfsResource(_hostname, _port, _filePath);
+
+                Thread thread = new Thread(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        res.read(new Action<InputStream>() {
+
+                            @Override
+                            public void run(InputStream is) throws Exception {
+                                String readAsString = FileHelper.readAsString(FileHelper.getReader(is, "UTF-8"));
+                                Assert.assertNotNull(readAsString);
+                                Assert.assertEquals("testFileSystemNotBeingClosed", readAsString);
+                            }
+                        });
+
+                    }
+                });
+                thread.setUncaughtExceptionHandler(exceptionHandler);
+                thread.start();
+            }
+        } finally {
+            if (resourceToRead != null) {
+                final FileSystem fileSystem = resourceToRead.getHadoopFileSystem();
+                final Path resourceToReadPath = new Path(resourceToRead.getFilepath());
+                if (fileSystem.exists(resourceToReadPath)) {
+                    fileSystem.delete(resourceToReadPath, true);
+                }
+            }
+        }
+    }
+}