You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:36 UTC
[49/50] [abbrv] carbondata git commit: [CARBONDATA-1173] Stream
ingestion - write path framework
[CARBONDATA-1173] Stream ingestion - write path framework
This closes #1064
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/441907ee
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/441907ee
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/441907ee
Branch: refs/heads/streaming_ingest
Commit: 441907ee84801de264916af7633c649a8e4a13f4
Parents: ad25ffc
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Oct 10 11:03:26 2017 +0800
----------------------------------------------------------------------
.../streaming/CarbonStreamingCommitInfo.java | 108 ++++++++++
.../streaming/CarbonStreamingConstants.java | 25 +++
.../streaming/CarbonStreamingMetaStore.java | 40 ++++
.../streaming/CarbonStreamingMetaStoreImpl.java | 56 ++++++
.../core/util/path/CarbonTablePath.java | 10 +
.../streaming/CarbonStreamingOutputFormat.java | 66 +++++++
.../streaming/CarbonStreamingRecordWriter.java | 196 +++++++++++++++++++
.../org/apache/spark/sql/CarbonSource.scala | 39 +++-
.../CarbonStreamingOutpurWriteFactory.scala | 88 +++++++++
.../streaming/CarbonStreamingOutputWriter.scala | 98 ++++++++++
10 files changed, 719 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
new file mode 100644
index 0000000..6cf303a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+/**
+ * Commit info for streaming writes
+ * The commit info can be used to recover valid offset in the file
+ * in the case of write failure.
+ */
+public class CarbonStreamingCommitInfo {
+
+ private String dataBase;
+
+ private String table;
+
+ private long commitTime;
+
+ private long segmentID;
+
+ private String partitionID;
+
+ private long batchID;
+
+ private String fileOffset;
+
+ private long transactionID; // future use
+
+ public CarbonStreamingCommitInfo(
+
+ String dataBase,
+
+ String table,
+
+ long commitTime,
+
+ long segmentID,
+
+ String partitionID,
+
+ long batchID) {
+
+ this.dataBase = dataBase;
+
+ this.table = table;
+
+ this.commitTime = commitTime;
+
+ this.segmentID = segmentID;
+
+ this.partitionID = partitionID;
+
+ this.batchID = batchID;
+
+ this.transactionID = -1;
+ }
+
+ public String getDataBase() {
+ return dataBase;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public long getCommitTime() {
+ return commitTime;
+ }
+
+ public long getSegmentID() {
+ return segmentID;
+ }
+
+ public String getPartitionID() {
+ return partitionID;
+ }
+
+ public long getBatchID() {
+ return batchID;
+ }
+
+ public String getFileOffset() {
+ return fileOffset;
+ }
+
+ public long getTransactionID() {
+ return transactionID;
+ }
+
+ @Override
+ public String toString() {
+ return dataBase + "." + table + "." + segmentID + "$" + partitionID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
new file mode 100644
index 0000000..db7186f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+public class CarbonStreamingConstants {
+
+ public static final long DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE = 1024 * 1024 * 1024; // 1GB
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
new file mode 100644
index 0000000..fa3746c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+
+import java.io.IOException;
+
+/**
+ * Generic interface for storing commit info for streaming ingest
+ */
+public interface CarbonStreamingMetaStore {
+
+ public CarbonStreamingCommitInfo getStreamingCommitInfo(
+ String dataBase,
+ String table,
+ long segmentID,
+ String partitionID) throws IOException;
+
+ public void updateStreamingCommitInfo(
+ CarbonStreamingMetaStore commitInfo) throws IOException;
+
+ public void recoverStreamingData(
+ CarbonStreamingCommitInfo commitInfo) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
new file mode 100644
index 0000000..0afe962
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+import java.io.IOException;
+
+/**
+ * JSON format can be used to store the metadata
+ */
+public class CarbonStreamingMetaStoreImpl implements CarbonStreamingMetaStore {
+
+ /**
+ * get commit info from metastore
+ */
+ public CarbonStreamingCommitInfo getStreamingCommitInfo(
+ String dataBase,
+ String table,
+ long segmentID,
+ String partitionID) throws IOException {
+
+ return null;
+
+ }
+
+ /**
+ * Update commit info in metastore
+ */
+ public void updateStreamingCommitInfo(
+ CarbonStreamingMetaStore commitInfo) throws IOException {
+
+ }
+
+ /**
+ * Recover streaming data using valid offset in commit info
+ */
+ public void recoverStreamingData(
+ CarbonStreamingCommitInfo commitInfo) throws IOException {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 0910afc..8f4fa26 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -51,6 +51,16 @@ public class CarbonTablePath extends Path {
protected static final String INDEX_FILE_EXT = ".carbonindex";
protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
+ /**
+ * Streaming ingest related paths
+ */
+ protected static final String STREAM_PREFIX = "Streaming";
+ protected static final String STREAM_FILE_NAME_EXT = ".carbondata.stream";
+ protected static final String STREAM_FILE_BEING_WRITTEN = "in-progress.carbondata.stream";
+ protected static final String STREAM_FILE_BEING_WRITTEN_META = "in-progress.meta";
+ protected static final String STREAM_COMPACTION_STATUS = "streaming_compaction_status";
+ protected static final String STREAM_FILE_LOCK = "streaming_in_use.lock";
+
protected String tablePath;
protected CarbonTableIdentifier carbonTableIdentifier;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
new file mode 100644
index 0000000..350684e
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.streaming.CarbonStreamingConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Output format to write streaming data to carbondata file
+ *
+ * @param <V> - type of record
+ */
+public class CarbonStreamingOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+ public static long getBlockSize(Configuration conf) {
+ return conf.getLong("dfs.block.size",
+ CarbonStreamingConstants.DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE);
+ }
+
+ public static void setBlockSize(Configuration conf, long blockSize) {
+ conf.setLong("dfs.block.size", blockSize);
+ }
+
+ /**
+ * When getRecordWriter may need to override
+ * to provide correct path including streaming segment name
+ */
+ @Override
+ public CarbonStreamingRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
+ throws IOException, InterruptedException {
+
+ Configuration conf = job.getConfiguration();
+
+ String keyValueSeparator = conf.get(
+ CSVInputFormat.DELIMITER,
+ CSVInputFormat.DELIMITER_DEFAULT);
+
+ return new CarbonStreamingRecordWriter<K, V>(
+ conf,
+ getDefaultWorkFile(job, null),
+ keyValueSeparator);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
new file mode 100644
index 0000000..9d1951f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class CarbonStreamingRecordWriter<K,V> extends RecordWriter<K, V> {
+
+ private static final String utf8 = "UTF-8";
+
+ private static final byte[] newline;
+
+ static {
+
+ try {
+
+ newline = "\n".getBytes(utf8);
+
+ } catch (UnsupportedEncodingException uee) {
+
+ throw new IllegalArgumentException("Can't find " + utf8 + " encoding");
+ }
+ }
+
+ private FSDataOutputStream outputStream;
+
+ private FileSystem fs;
+
+ private Path file;
+
+ private volatile boolean isClosed;
+
+ private final byte[] keyValueSeparator;
+
+ public void initOut() throws IOException {
+
+ outputStream = fs.create(file, false);
+
+ isClosed = false;
+ }
+
+ public CarbonStreamingRecordWriter(
+ Configuration conf,
+ Path file,
+ String keyValueSeparator) throws IOException {
+
+ this.file = file;
+
+ fs = FileSystem.get(conf);
+
+ outputStream = fs.create(file, false);
+
+ isClosed = false;
+
+ try {
+
+ this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+
+ } catch (UnsupportedEncodingException uee) {
+
+ throw new IllegalArgumentException("Can't find " + utf8 + "encoding");
+
+ }
+
+ }
+
+ public CarbonStreamingRecordWriter(
+ Configuration conf,
+ Path file) throws IOException {
+
+ this(conf, file, ",");
+
+ }
+
+ /**
+ * Write Object to byte stream.
+ */
+
+ private void writeObject(Object o) throws IOException {
+
+ if (o instanceof Text) {
+ Text to = (Text)o;
+
+ outputStream.write(to.getBytes(), 0, to.getLength());
+
+ } else {
+
+ outputStream.write(o.toString().getBytes(utf8));
+
+ }
+ }
+
+ /**
+ * Write streaming data as text file (temporary)
+ */
+
+ @Override
+ public synchronized void write(K key, V value) throws IOException {
+
+ boolean isNULLKey = key == null || key instanceof NullWritable;
+
+ boolean isNULLValue = value == null || value instanceof NullWritable;
+
+ if (isNULLKey && isNULLValue) {
+
+ return;
+ }
+
+ if (!isNULLKey) {
+
+ writeObject(key);
+ }
+
+ if (!isNULLKey || !isNULLValue) {
+
+ outputStream.write(keyValueSeparator);
+ }
+
+ if (!isNULLValue) {
+
+ writeObject(value);
+ }
+
+ outputStream.write(newline);
+ }
+
+ private void closeInternal() throws IOException {
+
+ if (!isClosed) {
+
+ outputStream.close();
+
+ isClosed = true;
+ }
+
+ }
+
+ public void flush() throws IOException {
+
+ outputStream.hflush();
+ }
+
+ public long getOffset() throws IOException {
+
+ return outputStream.getPos();
+ }
+
+ public void commit(boolean finalCommit) throws IOException {
+
+ closeInternal();
+
+ Path commitFile = new Path(file.getParent(),
+ CarbonTablePath.getCarbonDataPrefix() + System.currentTimeMillis());
+
+ fs.rename(file, commitFile);
+
+ if (!finalCommit) {
+ initOut();
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ closeInternal();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index f4f8b75..d496de2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -21,16 +21,19 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -41,12 +44,13 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends CreatableRelationProvider with RelationProvider
- with SchemaRelationProvider with DataSourceRegister {
+ with SchemaRelationProvider with DataSourceRegister with FileFormat {
override def shortName(): String = "carbondata"
@@ -54,7 +58,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
CarbonEnv.getInstance(sqlContext.sparkSession)
- // if path is provided we can directly create Hadoop relation. \
+ // if path is provided we can directly create Hadoop relation.
// Otherwise create datasource relation
parameters.get("tablePath") match {
case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
@@ -178,7 +182,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
/**
* Returns the path of the table
*
- * @param sparkSession
+ * @param sparkSession
* @param dbName
* @param tableName
* @return
@@ -203,11 +207,32 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
(relation.tableMeta.tablePath, parameters)
}
} catch {
- case ex: Exception =>
- throw new Exception(s"Do not have $dbName and $tableName", ex)
+ case ex: Exception =>
+ throw new Exception(s"Do not have $dbName and $tableName", ex)
}
}
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+ */
+ def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+
+ /**
+ * When possible, this method should return the schema of the given `files`. When the format
+ * does not support inference, or no valid files are given should return None. In these cases
+ * Spark will require that user specify the schema manually.
+ */
+ def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+
}
object CarbonSource {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
new file mode 100644
index 0000000..be69885
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+ * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+ * this method gets called by each task on executor side
+ * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+ *
+ * @param path Path to write the file.
+ * @param dataSchema Schema of the rows to be written. Partition columns are not
+ * included in the schema if the relation being written is
+ * partitioned.
+ * @param context The Hadoop MapReduce task context.
+ */
+
+ override def newInstance(
+ path: String,
+
+ dataSchema: StructType,
+
+ context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+ new CarbonStreamingOutputWriter(path, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+
+ CarbonTablePath.STREAM_FILE_NAME_EXT
+ }
+
+}
+
+object CarbonStreamingOutpurWriterFactory {
+
+ private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+ def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+ if (writers.contains(path)) {
+ throw new IllegalArgumentException(path + "writer already exists")
+ }
+
+ writers.put(path, writer)
+ }
+
+ def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+ writers.get(path)
+ }
+
+ def containsWriter(path: String): Boolean = {
+
+ writers.containsKey(path)
+ }
+
+ def removeWriter(path: String): Unit = {
+
+ writers.remove(path)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/441907ee/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
new file mode 100644
index 0000000..dfc8ff3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamingOutputFormat, CarbonStreamingRecordWriter}
+
+class CarbonStreamingOutputWriter (
+ path: String,
+ context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private[this] val buffer = new Text()
+
+ private val recordWriter: CarbonStreamingRecordWriter[NullWritable, Text] = {
+
+ val outputFormat = new CarbonStreamingOutputFormat[NullWritable, Text] () {
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String) : Path = {
+ new Path(path)
+ }
+
+ /*
+ May need to override
+ def getOutputCommiter(c: TaskAttemptContext): OutputCommitter = {
+ null
+ }
+ */
+
+ }
+
+ outputFormat.
+ getRecordWriter(context).asInstanceOf[CarbonStreamingRecordWriter[NullWritable, Text]]
+ }
+
+ override def write(row: Row): Unit = {
+
+ throw new UnsupportedOperationException("call writeInternal")
+
+ }
+
+ override protected [sql] def writeInternal(row: InternalRow): Unit = {
+
+ val utf8string = row.getUTF8String(0)
+
+ buffer.set(utf8string.getBytes)
+
+ recordWriter.write(NullWritable.get(), buffer)
+
+ }
+
+ def getpath: String = path
+
+ override def close(): Unit = {
+
+ recordWriter.close(context)
+
+ }
+
+ def flush(): Unit = {
+
+ recordWriter.flush()
+
+ }
+
+ def getPos(): Long = {
+
+ recordWriter.getOffset()
+
+ }
+
+ def commit(finalCommit: Boolean): Unit = {
+
+ recordWriter.commit(finalCommit)
+
+ }
+}