You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2015/08/25 08:58:41 UTC
metamodel git commit: METAMODEL-179: Fixed
Repository: metamodel
Updated Branches:
refs/heads/master fc2752abd -> 226b004a4
METAMODEL-179: Fixed
Fixes #44
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/226b004a
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/226b004a
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/226b004a
Branch: refs/heads/master
Commit: 226b004a4a99f6551bcde06bdc13661938035745
Parents: fc2752a
Author: Tomasz Guzialek <to...@apache.org>
Authored: Tue Aug 25 08:58:35 2015 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Tue Aug 25 08:58:35 2015 +0200
----------------------------------------------------------------------
CHANGES.md | 1 +
.../org/apache/metamodel/util/HdfsResource.java | 2 +-
.../util/HdfsResourceIntegrationTest.java | 427 +++++++++++--------
3 files changed, 242 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index f72768a..d3e6141 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -4,6 +4,7 @@
* [METAMODEL-176] - Trimmed the transient dependencies of the JDBC module.
* [METAMODEL-170] - Dropped support for Java 6.
* [METAMODEL-178] - Added AggregateFunction and ScalarFunction interfaces. Changed FunctionType enum to be super-interface of those. Compatibility is retained but a recompile of code using FunctionType is needed.
+ * [METAMODEL-179] - Ensured that HdfsResource is not closing a shared HDFS file system reference.
* [METAMODEL-171] - Made integration tests for Cassandra module function properly in all environments.
### Apache MetaModel 4.3.6
http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
index 316cef8..e9106ba 100644
--- a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
+++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
@@ -351,7 +351,7 @@ public class HdfsResource extends AbstractResource implements Serializable {
public FileSystem getHadoopFileSystem() {
try {
- return FileSystem.get(getHadoopConfiguration());
+ return FileSystem.newInstance(getHadoopConfiguration());
} catch (IOException e) {
throw new MetaModelException("Could not connect to HDFS: " + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/226b004a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
index 03a5610..48f9caa 100644
--- a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
+++ b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
@@ -1,187 +1,240 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.util;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Stopwatch;
-
-
-public class HdfsResourceIntegrationTest {
-
- private static final Logger logger = LoggerFactory.getLogger(HdfsResourceIntegrationTest.class);
-
- private String _filePath;
- private String _hostname;
- private int _port;
-
- @Before
- public void setUp() throws Exception {
- final File file = new File(getPropertyFilePath());
- final boolean configured;
- if (file.exists()) {
- final Properties properties = new Properties();
- properties.load(new FileReader(file));
- _filePath = properties.getProperty("hadoop.hdfs.file.path");
- _hostname = properties.getProperty("hadoop.hdfs.hostname");
- final String portString = properties.getProperty("hadoop.hdfs.port");
- configured = _filePath != null && _hostname != null && portString != null;
- if (configured) {
- _port = Integer.parseInt(portString);
- }
- } else {
- configured = false;
- }
- Assume.assumeTrue(configured);
- }
-
- private String getPropertyFilePath() {
- String userHome = System.getProperty("user.home");
- return userHome + "/metamodel-integrationtest-configuration.properties";
- }
-
- @Test
- public void testReadDirectory() throws Exception {
- final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
- final String[] contents = new String[] { "fun ", "and ", "games ", "with ", "Apache ", "MetaModel ", "and ", "Hadoop ", "is ", "what ", "we ", "do" };
-
- final Configuration conf = new Configuration();
- conf.set("fs.defaultFS", "hdfs://" + _hostname + ":" + _port);
- final FileSystem fileSystem = FileSystem.get(conf);
- final Path path = new Path(_filePath);
- final boolean exists = fileSystem.exists(path);
-
- if(exists){
- fileSystem.delete(path, true);
- }
-
- fileSystem.mkdirs(path);
-
-
- // Reverse both filename and contents to make sure it is the name and not the creation order that is sorted on.
- int i = contents.length;
- Collections.reverse(Arrays.asList(contents));
- for(final String contentPart : contents){
- final HdfsResource partResource = new HdfsResource(_hostname, _port, _filePath + "/part-" + String.format("%02d", i--));
- partResource.write(new Action<OutputStream>() {
- @Override
- public void run(OutputStream out) throws Exception {
- out.write(contentPart.getBytes());
- }
- });
- }
-
- final Stopwatch stopwatch = Stopwatch.createStarted();
-
- final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
-
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
-
- final String str1 = res1.read(new Func<InputStream, String>() {
- @Override
- public String eval(InputStream in) {
- return FileHelper.readInputStreamAsString(in, "UTF8");
- }
- });
-
- Assert.assertEquals(contentString, str1);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
-
- final String str2 = res1.read(new Func<InputStream, String>() {
- @Override
- public String eval(InputStream in) {
- return FileHelper.readInputStreamAsString(in, "UTF8");
- }
- });
- Assert.assertEquals(str1, str2);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
-
- res1.getHadoopFileSystem().delete(res1.getHadoopPath(), true);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
-
- Assert.assertFalse(res1.isExists());
-
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
- stopwatch.stop();
- }
-
- @Test
- public void testReadOnRealHdfsInstall() throws Exception {
- final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
-
- final Stopwatch stopwatch = Stopwatch.createStarted();
- final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
-
- res1.write(new Action<OutputStream>() {
- @Override
- public void run(OutputStream out) throws Exception {
- out.write(contentString.getBytes());
- }
- });
-
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - written");
-
- Assert.assertTrue(res1.isExists());
-
- final String str1 = res1.read(new Func<InputStream, String>() {
- @Override
- public String eval(InputStream in) {
- return FileHelper.readInputStreamAsString(in, "UTF8");
- }
- });
- Assert.assertEquals(contentString, str1);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
-
- final String str2 = res1.read(new Func<InputStream, String>() {
- @Override
- public String eval(InputStream in) {
- return FileHelper.readInputStreamAsString(in, "UTF8");
- }
- });
- Assert.assertEquals(str1, str2);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
-
- res1.getHadoopFileSystem().delete(res1.getHadoopPath(), false);
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
-
- Assert.assertFalse(res1.isExists());
-
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
- stopwatch.stop();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.util;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+
+public class HdfsResourceIntegrationTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(HdfsResourceIntegrationTest.class);
+
+ private String _filePath;
+ private String _hostname;
+ private int _port;
+
+ @Before
+ public void setUp() throws Exception {
+ final File file = new File(getPropertyFilePath());
+ final boolean configured;
+ if (file.exists()) {
+ final Properties properties = new Properties();
+ properties.load(new FileReader(file));
+ _filePath = properties.getProperty("hadoop.hdfs.file.path");
+ _hostname = properties.getProperty("hadoop.hdfs.hostname");
+ final String portString = properties.getProperty("hadoop.hdfs.port");
+ configured = _filePath != null && _hostname != null && portString != null;
+ if (configured) {
+ _port = Integer.parseInt(portString);
+ }
+ } else {
+ configured = false;
+ }
+ Assume.assumeTrue(configured);
+ }
+
+ private String getPropertyFilePath() {
+ String userHome = System.getProperty("user.home");
+ return userHome + "/metamodel-integrationtest-configuration.properties";
+ }
+
+ @Test
+ public void testReadDirectory() throws Exception {
+ final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
+ final String[] contents = new String[] { "fun ", "and ", "games ", "with ", "Apache ", "MetaModel ", "and ",
+ "Hadoop ", "is ", "what ", "we ", "do" };
+
+ final Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", "hdfs://" + _hostname + ":" + _port);
+ final FileSystem fileSystem = FileSystem.get(conf);
+ final Path path = new Path(_filePath);
+ final boolean exists = fileSystem.exists(path);
+
+ if (exists) {
+ fileSystem.delete(path, true);
+ }
+
+ fileSystem.mkdirs(path);
+
+ // Reverse both filename and contents to make sure it is the name and
+ // not the creation order that is sorted on.
+ int i = contents.length;
+ Collections.reverse(Arrays.asList(contents));
+ for (final String contentPart : contents) {
+ final HdfsResource partResource = new HdfsResource(_hostname, _port, _filePath + "/part-"
+ + String.format("%02d", i--));
+ partResource.write(new Action<OutputStream>() {
+ @Override
+ public void run(OutputStream out) throws Exception {
+ out.write(contentPart.getBytes());
+ }
+ });
+ }
+
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
+
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
+
+ final String str1 = res1.read(new Func<InputStream, String>() {
+ @Override
+ public String eval(InputStream in) {
+ return FileHelper.readInputStreamAsString(in, "UTF8");
+ }
+ });
+
+ Assert.assertEquals(contentString, str1);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
+
+ final String str2 = res1.read(new Func<InputStream, String>() {
+ @Override
+ public String eval(InputStream in) {
+ return FileHelper.readInputStreamAsString(in, "UTF8");
+ }
+ });
+ Assert.assertEquals(str1, str2);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
+
+ res1.getHadoopFileSystem().delete(res1.getHadoopPath(), true);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
+
+ Assert.assertFalse(res1.isExists());
+
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
+ stopwatch.stop();
+ }
+
+ @Test
+ public void testReadOnRealHdfsInstall() throws Exception {
+ final String contentString = "fun and games with Apache MetaModel and Hadoop is what we do";
+
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
+
+ res1.write(new Action<OutputStream>() {
+ @Override
+ public void run(OutputStream out) throws Exception {
+ out.write(contentString.getBytes());
+ }
+ });
+
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - written");
+
+ Assert.assertTrue(res1.isExists());
+
+ final String str1 = res1.read(new Func<InputStream, String>() {
+ @Override
+ public String eval(InputStream in) {
+ return FileHelper.readInputStreamAsString(in, "UTF8");
+ }
+ });
+ Assert.assertEquals(contentString, str1);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read1");
+
+ final String str2 = res1.read(new Func<InputStream, String>() {
+ @Override
+ public String eval(InputStream in) {
+ return FileHelper.readInputStreamAsString(in, "UTF8");
+ }
+ });
+ Assert.assertEquals(str1, str2);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - read2");
+
+ res1.getHadoopFileSystem().delete(res1.getHadoopPath(), false);
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - deleted");
+
+ Assert.assertFalse(res1.isExists());
+
+ logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - done");
+ stopwatch.stop();
+ }
+
+ @Test
+ public void testFileSystemNotBeingClosed() throws IOException {
+ HdfsResource resourceToRead = null;
+ try {
+ resourceToRead = new HdfsResource(_hostname, _port, _filePath);
+ resourceToRead.write(new Action<OutputStream>() {
+
+ @Override
+ public void run(OutputStream out) throws Exception {
+ FileHelper.writeString(out, "testFileSystemNotBeingClosed");
+ }
+ });
+
+ Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread th, Throwable ex) {
+ Assert.fail("Caught exception in the thread: " + ex);
+ }
+ };
+
+ for (int i = 0; i < 10; i++) {
+ final HdfsResource res = new HdfsResource(_hostname, _port, _filePath);
+
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ res.read(new Action<InputStream>() {
+
+ @Override
+ public void run(InputStream is) throws Exception {
+ String readAsString = FileHelper.readAsString(FileHelper.getReader(is, "UTF-8"));
+ Assert.assertNotNull(readAsString);
+ Assert.assertEquals("testFileSystemNotBeingClosed", readAsString);
+ }
+ });
+
+ }
+ });
+ thread.setUncaughtExceptionHandler(exceptionHandler);
+ thread.start();
+ }
+ } finally {
+ if (resourceToRead != null) {
+ final FileSystem fileSystem = resourceToRead.getHadoopFileSystem();
+ final Path resourceToReadPath = new Path(resourceToRead.getFilepath());
+ if (fileSystem.exists(resourceToReadPath)) {
+ fileSystem.delete(resourceToReadPath, true);
+ }
+ }
+ }
+ }
+}