You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:20 UTC
[19/50] [abbrv] carbondata git commit: [HOTFIX][CARBONDATA-2591] Fix
SDK CarbonReader filter issue
[HOTFIX][CARBONDATA-2591] Fix SDK CarbonReader filter issue
There are some issue in SDK CarbonReader filter function, please check the lira.
This closes #2363
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/290ef5a3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/290ef5a3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/290ef5a3
Branch: refs/heads/carbonstore
Commit: 290ef5a3a90081b3c95ea0dc418f643ea5ad694f
Parents: 0ef7e55
Author: xubo245 <xu...@huawei.com>
Authored: Thu Jun 7 22:00:31 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 12 10:54:04 2018 +0530
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 35 +++
.../apache/carbondata/core/util/CarbonUtil.java | 1 +
.../sdk/file/CarbonReaderBuilder.java | 6 +-
.../carbondata/sdk/file/CarbonReaderTest.java | 251 ++++++++++++++++++-
.../apache/carbondata/sdk/file/TestUtil.java | 14 ++
5 files changed, 304 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 6949643..20bc7a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.metadata.schema.table;
+import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -57,6 +59,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
+
/**
* Mapping class for Carbon actual table
*/
@@ -218,6 +222,37 @@ public class CarbonTable implements Serializable {
}
}
+ public static CarbonTable buildTable(
+ String tablePath,
+ String tableName) throws IOException {
+ TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
+ File[] dataFiles = new File(tablePath).listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name == null) {
+ return false;
+ }
+ return name.endsWith("carbonindex");
+ }
+ });
+ if (dataFiles == null || dataFiles.length < 1) {
+ throw new RuntimeException("Carbon index file not exists.");
+ }
+ org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+ .inferSchemaFromIndexFile(dataFiles[0].toString(), tableName);
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ for (org.apache.carbondata.format.ColumnSchema thriftColumnSchema : tableInfo
+ .getFact_table().getTable_columns()) {
+ ColumnSchema columnSchema = thriftColumnSchemaToWrapperColumnSchema(thriftColumnSchema);
+ if (columnSchema.getColumnReferenceId() == null) {
+ columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
+ }
+ columnSchemaList.add(columnSchema);
+ }
+ tableInfoInfer.getFactTable().setListOfColumns(columnSchemaList);
+ return CarbonTable.buildFromTableInfo(tableInfoInfer);
+ }
+
public static CarbonTable buildDummyTable(String tablePath) throws IOException {
TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
return CarbonTable.buildFromTableInfo(tableInfoInfer);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e1e5e16..2aa4a05 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2209,6 +2209,7 @@ public final class CarbonUtil {
org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
ColumnSchema wrapperColumnSchema = new ColumnSchema();
wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+ wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 98aa6e0..83cb34e 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -179,7 +179,11 @@ public class CarbonReaderBuilder {
if (isTransactionalTable) {
table = CarbonTable.buildFromTablePath(tableName, "default", tablePath);
} else {
- table = CarbonTable.buildDummyTable(tablePath);
+ if (filterExpression != null) {
+ table = CarbonTable.buildTable(tablePath, tableName);
+ } else {
+ table = CarbonTable.buildDummyTable(tablePath);
+ }
}
final CarbonFileInputFormat format = new CarbonFileInputFormat();
final Job job = new Job(new Configuration());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index a8aa795..fb2e2bc 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -20,8 +20,7 @@ package org.apache.carbondata.sdk.file;
import java.io.*;
import java.sql.Date;
import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Comparator;
+import java.util.*;
import org.apache.avro.generic.GenericData;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
@@ -29,6 +28,11 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -48,6 +52,12 @@ public class CarbonReaderTest extends TestCase {
@After
public void verifyDMFile() {
assert (!TestUtil.verifyMdtFile());
+ String path = "./testWriteFiles";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
@Test
@@ -106,6 +116,243 @@ public class CarbonReaderTest extends TestCase {
}
@Test
+ public void testReadWithFilterOfTransactional() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
+
+ EqualToExpression equalToExpression = new EqualToExpression(
+ new ColumnExpression("name", DataTypes.STRING),
+ new LiteralExpression("robot1", DataTypes.STRING));
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(true)
+ .projection(new String[]{"name", "age"})
+ .filter(equalToExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ // Default sort column is applied for dimensions. So, need to validate accordingly
+ assert ("robot1".equals(row[0]));
+ i++;
+ }
+ Assert.assertEquals(i, 20);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testReadWithFilterOfTransactionalAnd() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
+
+ ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+ EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+ new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+ ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+ EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+ new LiteralExpression("robot7", DataTypes.STRING));
+
+ AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2);
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(true)
+ .projection(new String[]{"name", "age", "doubleField"})
+ .filter(andExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (((String) row[0]).contains("robot7"));
+ assert (7 == (int) (row[1]));
+ assert (3.5 == (double) (row[2]));
+ i++;
+ }
+ Assert.assertEquals(i, 1);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+ ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING);
+ EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+ new LiteralExpression("robot1", DataTypes.STRING));
+
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(false)
+ .projection(new String[]{"name", "age"})
+ .filter(equalToExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ // Default sort column is applied for dimensions. So, need to validate accordingly
+ assert ("robot1".equals(row[0]));
+ i++;
+ }
+ Assert.assertEquals(i, 20);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+ ColumnExpression columnExpression = new ColumnExpression("age", DataTypes.INT);
+
+ EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+ new LiteralExpression("1", DataTypes.INT));
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(false)
+ .projection(new String[]{"name", "age"})
+ .filter(equalToExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ // Default sort column is applied for dimensions. So, need to validate accordingly
+ assert (((String) row[0]).contains("robot"));
+ assert (1 == (int) (row[1]));
+ i++;
+ }
+ Assert.assertEquals(i, 1);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+ ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+ EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+ new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+ ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+ EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+ new LiteralExpression("robot7", DataTypes.STRING));
+
+ AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2);
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(false)
+ .projection(new String[]{"name", "age", "doubleField"})
+ .filter(andExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (((String) row[0]).contains("robot7"));
+ assert (7 == (int) (row[1]));
+ assert (3.5 == (double) (row[2]));
+ i++;
+ }
+ Assert.assertEquals(i, 1);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+ TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+ ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+ EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+ new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+ ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+ EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+ new LiteralExpression("robot7", DataTypes.STRING));
+
+ OrExpression andExpression = new OrExpression(equalToExpression, equalToExpression2);
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .isTransactionalTable(false)
+ .projection(new String[]{"name", "age", "doubleField"})
+ .filter(andExpression)
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (((String) row[0]).contains("robot7"));
+ assert (7 == ((int) (row[1]) % 10));
+ assert (0.5 == ((double) (row[2]) % 1));
+ i++;
+ }
+ Assert.assertEquals(i, 20);
+
+ reader.close();
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
public void testReadColumnTwice() throws IOException, InterruptedException {
String path = "./testWriteFiles";
FileUtils.deleteDirectory(new File(path));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 0f00d61..919472c 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -49,6 +49,20 @@ public class TestUtil {
}
/**
+ * write file and verify
+ *
+ * @param rows number of rows
+ * @param schema schema
+ * @param path table store path
+ * @param persistSchema whether persist schema
+ * @param isTransactionalTable whether is transactional table
+ */
+ public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema,
+ boolean isTransactionalTable) {
+ writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+ }
+
+ /**
* Invoke CarbonWriter API to write carbon files and assert the file is rewritten
* @param rows number of rows to write
* @param schema schema of the file