You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/15 11:50:15 UTC

[10/42] carbondata git commit: [CARBONDATA-989] decompress error while load 'gz' and 'bz2' data into table

[CARBONDATA-989] decompress error while load 'gz' and 'bz2' data into table


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59d55454
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59d55454
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59d55454

Branch: refs/heads/branch-1.1
Commit: 59d55454c2bd606bb59c6ea9997191fa38480916
Parents: 9e913e0
Author: ranmx <ra...@fosun.com>
Authored: Thu Apr 27 16:17:33 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 12:45:27 2017 +0530

----------------------------------------------------------------------
 .../core/datastore/impl/FileFactory.java        |  12 +-
 .../core/datastore/CompressdFileTest.java       | 120 +++++++++++++++++++
 2 files changed, 129 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/59d55454/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index c32b956..de78a3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.GzipCodec;
 
 public final class FileFactory {
@@ -131,11 +133,15 @@ public final class FileFactory {
         } else {
           stream = fs.open(pt, bufferSize);
         }
+        String codecName = null;
         if (gzip) {
-          GzipCodec codec = new GzipCodec();
-          stream = codec.createInputStream(stream);
+          codecName = GzipCodec.class.getName();
         } else if (bzip2) {
-          BZip2Codec codec = new BZip2Codec();
+          codecName = BZip2Codec.class.getName();
+        }
+        if (null != codecName) {
+          CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
+          CompressionCodec codec = ccf.getCodecByClassName(codecName);
           stream = codec.createInputStream(stream);
         }
         break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59d55454/core/src/test/java/org/apache/carbondata/core/datastore/CompressdFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/CompressdFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/CompressdFileTest.java
new file mode 100644
index 0000000..14aec44
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/CompressdFileTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.File;
+import java.io.Writer;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import static junit.framework.TestCase.assertTrue;
+
+
+public class CompressdFileTest
+{
+  @BeforeClass public static void setUp() throws Exception  {
+    String path = "../core/src/test/resources/testFile";
+    String content = "hello world";
+
+    makeGzipFile(path, content);
+    makeBzip2File(path, content);
+
+  }
+
+  private static void makeGzipFile (String path, String content) throws Exception {
+    path = path + ".gz";
+    FileOutputStream output = new FileOutputStream(path);
+    try {
+      Writer writer = new OutputStreamWriter(new GZIPOutputStream(output),
+          "UTF-8");
+      try {
+        writer.write(content);
+      } finally {
+        writer.close();
+      }
+    } finally {
+      output.close();
+  }
+}
+
+  private static void makeBzip2File (String path, String content) throws Exception {
+    path = path + ".bz2";
+    FileOutputStream output = new FileOutputStream(path);
+    try {
+      Writer writer = new OutputStreamWriter(new BZip2CompressorOutputStream(output),
+          "UTF-8");
+      try {
+        writer.write(content);
+      } finally {
+        writer.close();
+      }
+    } finally {
+      output.close();
+    }
+  }
+
+  @Test public void testReadGzFile() throws Exception {
+    assertTrue(readCompressed("../core/src/test/resources/testFile.gz").equals("hello world"));
+  }
+
+  @Test public void testReadBzip2File() throws Exception {
+    assertTrue(readCompressed("../core/src/test/resources/testFile.bz2").equals("hello world"));
+  }
+
+  private static String readCompressed(String path) throws Exception {
+      DataInputStream fileReader = null;
+      BufferedReader bufferedReader = null;
+      String readLine = null;
+
+      try {
+        fileReader =
+            FileFactory.getDataInputStream(path, FileType.HDFS);
+        bufferedReader = new BufferedReader(new InputStreamReader(fileReader,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+        readLine = bufferedReader.readLine();
+      } finally {
+        if (null != fileReader) {
+          fileReader.close();
+        }
+
+        if (null != bufferedReader) {
+          bufferedReader.close();
+        }
+      }
+      return readLine;
+  }
+
+  @AfterClass public static void testCleanUp() {
+    new File("../core/src/test/resources/testFile.gz").deleteOnExit();
+    new File("../core/src/test/resources/testFile.bz2").deleteOnExit();
+  }
+}