You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2017/11/10 14:48:28 UTC
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/1485
[CARBONDATA-1572][Streaming] Add test case for streaming ingest
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
- [X ] Any interfaces changed?
No
- [ X] Any backward compatibility impacted?
No
- [ X] Document update required?
No
- [X ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
Yes
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/QiangCai/carbondata testcase_for_streaming
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/1485.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1485
----
commit e69cd0c3d51f171323ffe13c7a2cc2fc30bed944
Author: QiangCai <qi...@qq.com>
Date: 2017-11-10T11:57:17Z
add test case for streaming ingest
----
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150457381
--- Diff: integration/spark2/src/test/scala/org/apache/carbondata/streaming/CarbonAppendableStreamTestCase.scala ---
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+import java.util.concurrent.Executors
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.types.StructType
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
+
+/**
+ * Test case for org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
+ */
+class CarbonAppendableStreamTestCase extends Spark2QueryTest with BeforeAndAfterAll {
--- End diff --
fixed
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411342
--- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamInputFormatTest extends TestCase {
+
+ private TaskAttemptID taskAttemptId;
+ private TaskAttemptContext taskAttemptContext;
+ private Configuration hadoopConf;
+ private AbsoluteTableIdentifier identifier;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ storePath = new File("target/stream_input").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_input";
+ identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ }
+
+ private InputSplit buildInputSplit() throws IOException {
+ CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+ List<CarbonInputSplit> splitList = new ArrayList<>();
+ splitList.add(carbonInputSplit);
+ return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
+ FileFormat.rowformat);
+ }
+
+ @Test public void testCreateRecordReader() {
+ try {
+ InputSplit inputSplit = buildInputSplit();
+ CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+ RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ Assert.assertNotNull("Failed to create record reader", recordReader);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
--- End diff --
Since it is a input format, can you add a map reduce test case to test it?
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150431469
--- Diff: integration/spark2/src/test/scala/org/apache/carbondata/streaming/CarbonAppendableStreamTestCase.scala ---
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+import java.util.concurrent.Executors
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.types.StructType
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
+
+/**
+ * Test case for org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
+ */
+class CarbonAppendableStreamTestCase extends Spark2QueryTest with BeforeAndAfterAll {
--- End diff --
please merge to `TestStreamingTableOperation`
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1046/
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1632/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384785
--- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---
@@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) {
}
public long getLoadEndTime() {
+ if (timestamp == null) {
--- End diff --
for stream segment, it is null until the size reach the handoff size
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1009/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384766
--- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java ---
@@ -51,6 +51,14 @@
private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
+ private static final String SEGMENT_ID = "carbon.segment.id";
+
+ public static final String HANDOFF_SIZE = "carbon.handoff.size";
+
+ public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
+
+ public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
--- End diff --
fixed
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150256214
--- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java ---
@@ -51,6 +51,14 @@
private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
+ private static final String SEGMENT_ID = "carbon.segment.id";
+
+ public static final String HANDOFF_SIZE = "carbon.handoff.size";
+
+ public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
+
+ public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
--- End diff --
please add comment for these constants
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150263213
--- Diff: core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java ---
@@ -147,5 +149,39 @@ public static void tearDown() {
FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS);
assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS));
}
+
+ @Test public void testTruncateFile() {
+ FileWriter writer = null;
+ try {
+ // generate a file
+ String path = new File("truncatFile").getCanonicalPath();
+ writer = new FileWriter(path);
+ for (int i = 0; i < 4000; i++) {
+ writer.write("test truncate file method");
+ }
+ writer.close();
+ CarbonFile file = FileFactory.getCarbonFile(path);
+ assertTrue(file.getSize() == 100000L);
+
+ // truncate file to 4000 bytes
+ FileFactory.truncateFile(
+ path,
+ FileFactory.getFileType(path),
+ 4000);
+ file = FileFactory.getCarbonFile(path);
+ assertTrue(file.getSize() == 4000L);
--- End diff --
better to use assertEquals so that it will print the size
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1628/
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1667/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411392
--- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---
@@ -74,8 +74,16 @@ object CarbonStore {
Row(
load.getLoadName,
load.getLoadStatus,
- new java.sql.Timestamp(load.getLoadStartTime),
- new java.sql.Timestamp(load.getLoadEndTime),
+ if (load.getLoadStartTime == -1) {
--- End diff --
I think it is better to move `if` block before line 74, like `mergedTo`
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411303
--- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---
@@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) {
}
public long getLoadEndTime() {
+ if (timestamp == null) {
--- End diff --
Can you replace `-1` with a meaningful constant, to improve readability
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411369
--- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---
@@ -74,8 +74,16 @@ object CarbonStore {
Row(
load.getLoadName,
load.getLoadStatus,
- new java.sql.Timestamp(load.getLoadStartTime),
- new java.sql.Timestamp(load.getLoadEndTime),
+ if (load.getLoadStartTime == -1) {
--- End diff --
replace `-1` with constant
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1600/
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1049/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150456900
--- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
@@ -68,6 +68,10 @@ class CarbonAppendableStreamSink(
}
conf
}
+ // segment max size
--- End diff --
the unit is Byte, I will comment
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1485
Please list the test suite added in PR description
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150265010
--- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---
@@ -84,6 +84,9 @@ public void setPartitionCount(String partitionCount) {
}
public long getLoadEndTime() {
+ if (timestamp == null) {
--- End diff --
Can it be null? why?
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1014/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/1485
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411411
--- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
@@ -68,6 +68,10 @@ class CarbonAppendableStreamSink(
}
conf
}
+ // segment max size
--- End diff --
Size in MB? please add in comment
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1485
LGTM
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1485
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1664/
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150384776
--- Diff: core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java ---
@@ -147,5 +149,39 @@ public static void tearDown() {
FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS);
assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS));
}
+
+ @Test public void testTruncateFile() {
+ FileWriter writer = null;
+ try {
+ // generate a file
+ String path = new File("truncatFile").getCanonicalPath();
+ writer = new FileWriter(path);
+ for (int i = 0; i < 4000; i++) {
+ writer.write("test truncate file method");
+ }
+ writer.close();
+ CarbonFile file = FileFactory.getCarbonFile(path);
+ assertTrue(file.getSize() == 100000L);
+
+ // truncate file to 4000 bytes
+ FileFactory.truncateFile(
+ path,
+ FileFactory.getFileType(path),
+ 4000);
+ file = FileFactory.getCarbonFile(path);
+ assertTrue(file.getSize() == 4000L);
--- End diff --
fixed
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150443128
--- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamOutputFormatTest extends TestCase {
+
+ private Configuration hadoopConf;
+ private TaskAttemptID taskAttemptId;
+ private CarbonLoadModel carbonLoadModel;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ hadoopConf.set("mapred.job.id", jobId.toString());
+ hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
+ hadoopConf.set("mapred.task.id", taskAttemptId.toString());
+ hadoopConf.setBoolean("mapred.task.is.map", true);
+ hadoopConf.setInt("mapred.task.partition", 0);
+
+ storePath = new File("target/stream_output").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_output";
+ AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ CarbonTable table = StoreCreator.createTable(identifier);
+
+ String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+ carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
+ }
+
+ @Test public void testSetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+
+ Assert.assertNotNull("Failed to get CarbonLoadModel", model);
+ Assert.assertTrue("CarbonLoadModel should be same with previous",
+ carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetRecordWriter() {
+ CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ TaskAttemptContext taskAttemptContext =
+ new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+ Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
--- End diff --
I will add the test case in another PR.
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150411356
--- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamOutputFormatTest extends TestCase {
+
+ private Configuration hadoopConf;
+ private TaskAttemptID taskAttemptId;
+ private CarbonLoadModel carbonLoadModel;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ hadoopConf.set("mapred.job.id", jobId.toString());
+ hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
+ hadoopConf.set("mapred.task.id", taskAttemptId.toString());
+ hadoopConf.setBoolean("mapred.task.is.map", true);
+ hadoopConf.setInt("mapred.task.partition", 0);
+
+ storePath = new File("target/stream_output").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_output";
+ AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ CarbonTable table = StoreCreator.createTable(identifier);
+
+ String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+ carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
+ }
+
+ @Test public void testSetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+
+ Assert.assertNotNull("Failed to get CarbonLoadModel", model);
+ Assert.assertTrue("CarbonLoadModel should be same with previous",
+ carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetRecordWriter() {
+ CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ TaskAttemptContext taskAttemptContext =
+ new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+ Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
--- End diff --
Since it is output format, can you add a map reduce test case for it?
---
[GitHub] carbondata pull request #1485: [CARBONDATA-1572][Streaming] Add test case fo...
Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1485#discussion_r150443123
--- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamInputFormatTest extends TestCase {
+
+ private TaskAttemptID taskAttemptId;
+ private TaskAttemptContext taskAttemptContext;
+ private Configuration hadoopConf;
+ private AbsoluteTableIdentifier identifier;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ storePath = new File("target/stream_input").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_input";
+ identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ }
+
+ private InputSplit buildInputSplit() throws IOException {
+ CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+ List<CarbonInputSplit> splitList = new ArrayList<>();
+ splitList.add(carbonInputSplit);
+ return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
+ FileFormat.rowformat);
+ }
+
+ @Test public void testCreateRecordReader() {
+ try {
+ InputSplit inputSplit = buildInputSplit();
+ CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+ RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ Assert.assertNotNull("Failed to create record reader", recordReader);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
--- End diff --
I will add the test case in another PR.
---
[GitHub] carbondata issue #1485: [CARBONDATA-1572][Streaming] Add test case for strea...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1485
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/984/
---