You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:38:52 UTC
[06/50] carbondata git commit: [CARBONDATA-1827] S3 Carbon
Implementation
[CARBONDATA-1827] S3 Carbon Implementation
1.Provide support for s3 in carbondata.
2.Added S3Example to create carbon table on s3.
3.Added S3CSVExample to load carbon table using csv from s3.
This closes #1805
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6ea4cab0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6ea4cab0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6ea4cab0
Branch: refs/heads/carbonstore-rebase4
Commit: 6ea4cab040d6436329b4447f5d17605aaa76c8c2
Parents: 1f44b84
Author: SangeetaGulia <sa...@knoldus.in>
Authored: Thu Sep 21 14:56:26 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 16:59:47 2018 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 21 +++
.../filesystem/AbstractDFSCarbonFile.java | 20 ++-
.../datastore/filesystem/HDFSCarbonFile.java | 5 +-
.../core/datastore/impl/FileFactory.java | 11 +-
.../core/locks/CarbonLockFactory.java | 28 ++--
.../carbondata/core/locks/S3FileLock.java | 111 +++++++++++++
.../carbondata/core/util/CarbonProperties.java | 3 +-
.../filesystem/HDFSCarbonFileTest.java | 8 +-
examples/spark2/pom.xml | 5 +
examples/spark2/src/main/resources/data1.csv | 11 ++
.../carbondata/examples/S3CsvExample.scala | 99 +++++++++++
.../apache/carbondata/examples/S3Example.scala | 164 +++++++++++++++++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 42 ++++-
integration/spark2/pom.xml | 43 +++++
.../spark/rdd/CarbonDataRDDFactory.scala | 3 +-
.../org/apache/spark/sql/CarbonSession.scala | 3 +
16 files changed, 554 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6e6482d..2e169c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -167,6 +167,22 @@ public final class CarbonCommonConstants {
public static final String S3N_PREFIX = "s3n://";
public static final String S3A_PREFIX = "s3a://";
+ /**
+ * Access Key for s3n
+ */
+ public static final String S3N_ACCESS_KEY = "fs.s3n.awsAccessKeyId";
+ /**
+ * Secret Key for s3n
+ */
+ public static final String S3N_SECRET_KEY = "fs.s3n.awsSecretAccessKey";
+ /**
+ * Access Key for s3
+ */
+ public static final String S3_ACCESS_KEY = "fs.s3.awsAccessKeyId";
+ /**
+ * Secret Key for s3
+ */
+ public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey";
/**
* FS_DEFAULT_FS
@@ -941,6 +957,11 @@ public final class CarbonCommonConstants {
public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
/**
+ * S3LOCK TYPE
+ */
+ public static final String CARBON_LOCK_TYPE_S3 = "S3LOCK";
+
+ /**
* Invalid filter member log string
*/
public static final String FILTER_INVALID_MEMBER =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 68eaa21..fd5dc40 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
-public abstract class AbstractDFSCarbonFile implements CarbonFile {
+public abstract class AbstractDFSCarbonFile implements CarbonFile {
/**
* LOGGER
*/
@@ -262,18 +262,28 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
@Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
int bufferSize, boolean append) throws IOException {
Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+ FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration());
FSDataOutputStream stream = null;
if (append) {
// append to a file only if file already exists else file not found
// exception will be thrown by hdfs
if (CarbonUtil.isFileExists(path)) {
- stream = fs.append(pt, bufferSize);
+ if (FileFactory.FileType.S3 == fileType) {
+ DataInputStream dataInputStream = fileSystem.open(pt);
+ int count = dataInputStream.available();
+ // create buffer
+ byte[] byteStreamBuffer = new byte[count];
+ dataInputStream.read(byteStreamBuffer);
+ stream = fileSystem.create(pt, true, bufferSize);
+ stream.write(byteStreamBuffer);
+ } else {
+ stream = fileSystem.append(pt, bufferSize);
+ }
} else {
- stream = fs.create(pt, true, bufferSize);
+ stream = fileSystem.create(pt, true, bufferSize);
}
} else {
- stream = fs.create(pt, true, bufferSize);
+ stream = fileSystem.create(pt, true, bufferSize);
}
return stream;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
index d470b47..892a556 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
@@ -107,8 +107,11 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
org.apache.hadoop.fs.Options.Rename.OVERWRITE);
return true;
+ } else if (fileStatus.getPath().toString().startsWith("s3n")) {
+ fs.delete(new Path(changetoName), true);
+ return fs.rename(fileStatus.getPath(), new Path(changetoName));
} else {
- return false;
+ return fs.rename(fileStatus.getPath(), new Path(changetoName));
}
} catch (IOException e) {
LOGGER.error("Exception occured: " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 38ed2b7..f141991 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -246,7 +246,15 @@ public final class FileFactory {
*/
public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
throws IOException {
- return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
+ if (FileType.S3 == fileType) {
+ CarbonFile carbonFile = getCarbonFile(path);
+ if (carbonFile.exists()) {
+ carbonFile.delete();
+ }
+ return carbonFile.getDataOutputStream(path,fileType);
+ } else {
+ return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
+ }
}
/**
@@ -423,6 +431,7 @@ public final class FileFactory {
throws IOException {
FileFactory.FileType fileType = FileFactory.getFileType(directoryPath);
switch (fileType) {
+ case S3:
case HDFS:
case VIEWFS:
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index e70e655..3226a63 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -52,18 +52,21 @@ public class CarbonLockFactory {
*/
public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier,
String lockFile) {
- switch (lockTypeConfigured) {
- case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
- return new LocalFileLock(absoluteTableIdentifier, lockFile);
- case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
- return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
-
- case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
- return new HdfsFileLock(absoluteTableIdentifier, lockFile);
-
- default:
- throw new UnsupportedOperationException("Not supported the lock type");
+ String tablePath = absoluteTableIdentifier.getTablePath();
+ if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
+ return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
+ } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
+ tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
+ tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+ lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3;
+ return new S3FileLock(absoluteTableIdentifier, lockFile);
+ } else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+ lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS;
+ return new HdfsFileLock(absoluteTableIdentifier, lockFile);
+ } else {
+ lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL;
+ return new LocalFileLock(absoluteTableIdentifier, lockFile);
}
}
@@ -84,6 +87,9 @@ public class CarbonLockFactory {
case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
return new HdfsFileLock(locFileLocation, lockFile);
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_S3:
+ return new S3FileLock(locFileLocation, lockFile);
+
default:
throw new UnsupportedOperationException("Not supported the lock type");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
new file mode 100644
index 0000000..8836960
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.locks;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+
+/**
+ * This class is used to handle the S3 File locking.
+ * This is acheived using the concept of acquiring the data out stream using Append option.
+ */
+public class S3FileLock extends AbstractCarbonLock {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(S3FileLock.class.getName());
+ /**
+ * location s3 file location
+ */
+ private String location;
+
+ private DataOutputStream dataOutputStream;
+
+ /**
+ * @param tableIdentifier
+ * @param lockFile
+ */
+ public S3FileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) {
+ this(tableIdentifier.getTablePath(), lockFile);
+ }
+
+ /**
+ * @param lockFileLocation
+ * @param lockFile
+ */
+ public S3FileLock(String lockFileLocation, String lockFile) {
+ this.location = lockFileLocation + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+ LOGGER.info("S3 lock path:" + this.location);
+ initRetry();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.carbondata.core.locks.ICarbonLock#unlock()
+ */
+ @Override public boolean unlock() {
+ boolean status = false;
+ if (null != dataOutputStream) {
+ try {
+ dataOutputStream.close();
+ status = true;
+ } catch (IOException e) {
+ status = false;
+ } finally {
+ CarbonFile carbonFile =
+ FileFactory.getCarbonFile(location, FileFactory.getFileType(location));
+ if (carbonFile.exists()) {
+ if (carbonFile.delete()) {
+ LOGGER.info("Deleted the lock file " + location);
+ } else {
+ LOGGER.error("Not able to delete the lock file " + location);
+ status = false;
+ }
+ } else {
+ LOGGER.error(
+ "Not able to delete the lock file because it is not existed in location " + location);
+ status = false;
+ }
+ }
+ }
+ return status;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
+ */
+ @Override public boolean lock() {
+ try {
+ if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
+ FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+ }
+ dataOutputStream =
+ FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+ return true;
+ } catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 3dc7b8f..9d52669 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -369,7 +369,8 @@ public final class CarbonProperties {
String defaultFs = configuration.get("fs.defaultFS");
if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
|| defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
- .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX))
+ .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || defaultFs
+ .startsWith(CarbonCommonConstants.S3A_PREFIX))
&& !CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS.equalsIgnoreCase(lockTypeConfigured)) {
LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
+ LOCK_TYPE + " is invalid for current file system. "
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
index 7726693..4018123 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java
@@ -369,7 +369,13 @@ public class HDFSCarbonFileTest {
}
};
- assertEquals(hdfsCarbonFile.renameForce(fileName), false);
+ new MockUp<WebHdfsFileSystem>(){
+ @Mock
+ public boolean rename(final Path src, final Path dst) throws IOException {
+ return true;
+ }
+ };
+ assertEquals(hdfsCarbonFile.renameForce(fileName), true);
}
@Test
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index c17f0ee..f64dc9f 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -62,6 +62,11 @@
<version>${spark.version}</version>
<scope>${spark.deps.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/examples/spark2/src/main/resources/data1.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data1.csv b/examples/spark2/src/main/resources/data1.csv
new file mode 100644
index 0000000..cf732eb
--- /dev/null
+++ b/examples/spark2/src/main/resources/data1.csv
@@ -0,0 +1,11 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField
+1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5
+5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5
+1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5
+1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5
+1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5
+3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5
+2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5
+1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5
+4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5
+1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
new file mode 100644
index 0000000..b37fba8
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, SECRET_KEY}
+import org.apache.spark.sql.SparkSession
+import org.slf4j.{Logger, LoggerFactory}
+
+object S3CsvExample {
+
+ /**
+ * This example demonstrate to create local store and load data from CSV files on S3
+ *
+ * @param args require three parameters "Access-key" "Secret-key"
+ * "s3 path to csv" "spark-master"
+ */
+ def main(args: Array[String]) {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+ import org.apache.spark.sql.CarbonSession._
+ if (args.length != 4) {
+ logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
+ "<s3.csv.location> <spark-master>")
+ System.exit(0)
+ }
+
+ val spark = SparkSession
+ .builder()
+ .master(args(3))
+ .appName("S3CsvExample")
+ .config("spark.driver.host", "localhost")
+ .config("spark.hadoop." + ACCESS_KEY, args(0))
+ .config("spark.hadoop." + SECRET_KEY, args(1))
+ .getOrCreateCarbonSession()
+
+ spark.sparkContext.setLogLevel("INFO")
+
+ spark.sql(
+ s"""
+ | CREATE TABLE if not exists carbon_table1(
+ | shortField SHORT,
+ | intField INT,
+ | bigintField LONG,
+ | doubleField DOUBLE,
+ | stringField STRING,
+ | timestampField TIMESTAMP,
+ | decimalField DECIMAL(18,2),
+ | dateField DATE,
+ | charField CHAR(5),
+ | floatField FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | LOCATION '$rootPath/examples/spark2/target/store'
+ | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '${ args(2) }'
+ | INTO TABLE carbon_table1
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '${ args(2) }'
+ | INTO TABLE carbon_table1
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | SELECT *
+ | FROM carbon_table1
+ """.stripMargin).show()
+
+ spark.sql("Drop table if exists carbon_table1")
+
+ spark.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
new file mode 100644
index 0000000..d3d0a37
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+object S3Example {
+
+ /**
+ * This example demonstrate usage of
+ * 1. create carbon table with storage location on object based storage
+ * like AWS S3, Huawei OBS, etc
+ * 2. load data into carbon table, the generated file will be stored on object based storage
+ * query the table.
+ *
+ * @param args require three parameters "Access-key" "Secret-key"
+ * "table-path on s3" "s3-endpoint" "spark-master"
+ */
+ def main(args: Array[String]) {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv"
+ val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+ import org.apache.spark.sql.CarbonSession._
+ if (args.length < 3 || args.length > 5) {
+ logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
+ "<table-path-on-s3> [s3-endpoint] [spark-master]")
+ System.exit(0)
+ }
+
+ val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+ val spark = SparkSession
+ .builder()
+ .master(getSparkMaster(args))
+ .appName("S3Example")
+ .config("spark.driver.host", "localhost")
+ .config(accessKey, args(0))
+ .config(secretKey, args(1))
+ .config(endpoint, getS3EndPoint(args))
+ .getOrCreateCarbonSession()
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ spark.sql("Drop table if exists carbon_table")
+
+ spark.sql(
+ s"""
+ | CREATE TABLE if not exists carbon_table(
+ | shortField SHORT,
+ | intField INT,
+ | bigintField LONG,
+ | doubleField DOUBLE,
+ | stringField STRING,
+ | timestampField TIMESTAMP,
+ | decimalField DECIMAL(18,2),
+ | dateField DATE,
+ | charField CHAR(5),
+ | floatField FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | LOCATION '${ args(2) }'
+ | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon_table
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | SELECT *
+ | FROM carbon_table
+ """.stripMargin).show()
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon_table
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon_table
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ val countSegment: Array[Row] =
+ spark.sql(
+ s"""
+ | SHOW SEGMENTS FOR TABLE carbon_table
+ """.stripMargin).collect()
+
+ while (countSegment.length != 3) {
+ this.wait(2000)
+ }
+
+ // Use compaction command to merge segments or small files in object based storage,
+ // this can be done periodically.
+ spark.sql("ALTER table carbon_table compact 'MAJOR'")
+ spark.sql("show segments for table carbon_table").show()
+
+ spark.sql(
+ s"""
+ | SELECT *
+ | FROM carbon_table
+ """.stripMargin).show()
+
+ spark.sql("Drop table if exists carbon_table")
+
+ spark.stop()
+ }
+
+ def getKeyOnPrefix(path: String): (String, String, String) = {
+ val endPoint = "spark.hadoop." + ENDPOINT
+ if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+ ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+ ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
+ "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+ ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
+ "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+ } else {
+ throw new Exception("Incorrect Store Path")
+ }
+ }
+
+ def getS3EndPoint(args: Array[String]): String = {
+ if (args.length >= 4 && args(3).contains(".com")) args(3)
+ else ""
+ }
+
+ def getSparkMaster(args: Array[String]): String = {
+ if (args.length == 5) args(4)
+ else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
+ else "local"
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1fa1689..917fc88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -346,11 +347,31 @@ class NewDataFrameLoaderRDD[K, V](
sc: SparkContext,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
+ prev: DataLoadCoalescedRDD[Row],
+ @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) {
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ private val confBytes = {
+ val bao = new ByteArrayOutputStream()
+ val oos = new ObjectOutputStream(bao)
+ hadoopConf.write(oos)
+ oos.close()
+ CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+ }
+ private def getConf = {
+ val configuration = new Configuration(false)
+ val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+ .unCompressByte(confBytes))
+ val ois = new ObjectInputStream(bai)
+ configuration.readFields(ois)
+ ois.close()
+ configuration
+ }
+
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val hadoopConf = getConf
+ setS3Configurations(hadoopConf)
val iter = new Iterator[(K, V)] {
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -420,6 +441,23 @@ class NewDataFrameLoaderRDD[K, V](
iter
}
override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+ private def setS3Configurations(hadoopConf: Configuration): Unit = {
+ FileFactory.getConfiguration
+ .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""))
+ FileFactory.getConfiguration
+ .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""))
+ FileFactory.getConfiguration
+ .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""))
+ FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""))
+ FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""))
+ FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""))
+ FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""))
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index b68a55d..aac1ff6 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -57,6 +57,49 @@
<version>2.2.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.7.4</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <version>0.9.0</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 09484c4..6f08154 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -976,7 +976,8 @@ object CarbonDataRDDFactory {
sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
- newRdd
+ newRdd,
+ sqlContext.sparkContext.hadoopConfiguration
).collect()
} catch {
case ex: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6ea4cab0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 0116d9e..935b0a6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
@@ -30,6 +31,7 @@ import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.util.{CarbonReflectionUtils, Utils}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
/**
@@ -152,6 +154,7 @@ object CarbonSession {
sparkConf.setAppName(randomAppName)
}
val sc = SparkContext.getOrCreate(sparkConf)
+ setS3Configurations(sc)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }