You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/08/01 18:04:01 UTC
orc git commit: HIVE-14214. ORC schema evolution and predicate push
down do not work together. (Matt McCline reviewed by omalley and prasanthj)
Repository: orc
Updated Branches:
refs/heads/master 1253aff23 -> c1a504d4c
HIVE-14214. ORC schema evolution and predicate push down do not work together.
(Matt McCline reviewed by omalley and prasanthj)
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/c1a504d4
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/c1a504d4
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/c1a504d4
Branch: refs/heads/master
Commit: c1a504d4c7fbf7dbf10dea997978d093dea466a1
Parents: 1253aff
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Aug 1 11:01:26 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Aug 1 11:01:26 2016 -0700
----------------------------------------------------------------------
README.md | 2 +-
.../org/apache/orc/impl/RecordReaderImpl.java | 35 +++--
.../org/apache/orc/impl/SchemaEvolution.java | 132 ++++++++++++++-----
3 files changed, 123 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/c1a504d4/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c34b187..cad654c 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ other and will each read all versions of ORC files.
The current build status:
* Apache ORC master ![master build status](https://travis-ci.org/apache/orc.svg?branch=master)
-### Building
+### Building
* Install java 1.7 or higher
* Install maven 3 or higher
http://git-wip-us.apache.org/repos/asf/orc/blob/c1a504d4/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 36a802e..a052ca5 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -76,6 +76,7 @@ public class RecordReaderImpl implements RecordReader {
protected final TypeDescription schema;
private final List<OrcProto.Type> types;
private final int bufferSize;
+ private final SchemaEvolution evolution;
private final boolean[] included;
private final long rowIndexStride;
private long rowInStripe = 0;
@@ -134,36 +135,50 @@ public class RecordReaderImpl implements RecordReader {
protected RecordReaderImpl(ReaderImpl fileReader,
Reader.Options options) throws IOException {
- SchemaEvolution treeReaderSchema;
this.included = options.getInclude();
included[0] = true;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("Schema on read not provided -- using file schema " +
+ LOG.info("Reader schema not provided -- using file schema " +
fileReader.getSchema());
}
- treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
+ evolution = new SchemaEvolution(fileReader.getSchema(), included);
} else {
- // Now that we are creating a record reader for a file, validate that the schema to read
- // is compatible with the file schema.
+ // Now that we are creating a record reader for a file, validate that
+ // the schema to read is compatible with the file schema.
//
- treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+ evolution = new SchemaEvolution(fileReader.getSchema(),
options.getSchema(),included);
+ if (LOG.isDebugEnabled() && evolution.hasConversion()) {
+ LOG.debug("ORC file " + fileReader.path.toString() +
+ " has data type conversion --\n" +
+ "reader schema: " + options.getSchema().toString() + "\n" +
+ "file schema: " + fileReader.getSchema());
+ }
}
- this.schema = treeReaderSchema.getReaderSchema();
+ this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
this.codec = fileReader.codec;
this.types = fileReader.types;
this.bufferSize = fileReader.bufferSize;
this.rowIndexStride = fileReader.rowIndexStride;
SearchArgument sarg = options.getSearchArgument();
- if (sarg != null && rowIndexStride != 0) {
+ // We want to use the sarg for predicate evaluation but we have data type
+ // conversion (i.e Schema Evolution), so we currently ignore it.
+ if (sarg != null && rowIndexStride != 0 && !evolution.hasConversion()) {
sargApp = new SargApplier(
sarg, options.getColumnNames(), rowIndexStride, types,
included.length);
} else {
sargApp = null;
+ if (evolution.hasConversion()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping stripe elimination for {} since the" +
+ " schema has data type conversion",
+ fileReader.path);
+ }
+ }
}
long rows = 0;
long skippedRows = 0;
@@ -205,8 +220,8 @@ public class RecordReaderImpl implements RecordReader {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
- reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
- treeReaderSchema, included, skipCorrupt);
+ reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
+ evolution, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
http://git-wip-us.apache.org/repos/asf/orc/blob/c1a504d4/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index 07b527d..849ce0f 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -20,63 +20,77 @@ package org.apache.orc.impl;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Take the file types and the (optional) configuration column names/types and see if there
- * has been schema evolution.
+ * Take the file types and the (optional) configuration column names/types and
+ * see if there has been schema evolution.
*/
public class SchemaEvolution {
- private final Map<Integer, TypeDescription> readerToFile;
+ private final TypeDescription[] readerFileTypes;
private final boolean[] included;
private final TypeDescription readerSchema;
- private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolution.class);
+ private boolean hasConversion = false;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SchemaEvolution.class);
public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
- this.included = included;
- readerToFile = null;
+ this.included = (included == null ? null : Arrays.copyOf(included,
+ included.length));
this.readerSchema = readerSchema;
+
+ readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId()+ 1];
+ buildSameSchemaFileTypesArray();
}
public SchemaEvolution(TypeDescription fileSchema,
TypeDescription readerSchema,
boolean[] included) throws IOException {
- readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
- this.included = included;
+ this.included = (included == null ? null : Arrays.copyOf(included,
+ included.length));
if (checkAcidSchema(fileSchema)) {
this.readerSchema = createEventSchema(readerSchema);
} else {
this.readerSchema = readerSchema;
}
- buildMapping(fileSchema, this.readerSchema);
+
+ readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId()+ 1];
+ buildConversionFileTypesArray(fileSchema, this.readerSchema);
}
public TypeDescription getReaderSchema() {
return readerSchema;
}
+ /**
+ * Is there Schema Evolution data type conversion?
+ * @return
+ */
+ public boolean hasConversion() {
+ return hasConversion;
+ }
+
public TypeDescription getFileType(TypeDescription readerType) {
- TypeDescription result;
- if (readerToFile == null) {
- if (included == null || included[readerType.getId()]) {
- result = readerType;
- } else {
- result = null;
- }
- } else {
- result = readerToFile.get(readerType.getId());
- }
- return result;
+ return getFileType(readerType.getId());
}
- void buildMapping(TypeDescription fileType,
- TypeDescription readerType) throws IOException {
+ /**
+ * Get the file type by reader type id.
+ * @param readerType
+ * @return
+ */
+ public TypeDescription getFileType(int id) {
+ return readerFileTypes[id];
+ }
+
+ void buildConversionFileTypesArray(TypeDescription fileType,
+ TypeDescription readerType
+ ) throws IOException {
// if the column isn't included, don't map it
if (included != null && !included[readerType.getId()]) {
return;
@@ -100,10 +114,19 @@ public class SchemaEvolution {
break;
case CHAR:
case VARCHAR:
- // We do conversion when same CHAR/VARCHAR type but different maxLength.
+ // We do conversion when same CHAR/VARCHAR type but different
+ // maxLength.
+ if (fileType.getMaxLength() != readerType.getMaxLength()) {
+ hasConversion = true;
+ }
break;
case DECIMAL:
- // We do conversion when same DECIMAL type but different precision/scale.
+ // We do conversion when same DECIMAL type but different
+ // precision/scale.
+ if (fileType.getPrecision() != readerType.getPrecision() ||
+ fileType.getScale() != readerType.getScale()) {
+ hasConversion = true;
+ }
break;
case UNION:
case MAP:
@@ -113,7 +136,8 @@ public class SchemaEvolution {
List<TypeDescription> readerChildren = readerType.getChildren();
if (fileChildren.size() == readerChildren.size()) {
for(int i=0; i < fileChildren.size(); ++i) {
- buildMapping(fileChildren.get(i), readerChildren.get(i));
+ buildConversionFileTypesArray(fileChildren.get(i),
+ readerChildren.get(i));
}
} else {
isOk = false;
@@ -124,9 +148,13 @@ public class SchemaEvolution {
// allow either side to have fewer fields than the other
List<TypeDescription> fileChildren = fileType.getChildren();
List<TypeDescription> readerChildren = readerType.getChildren();
+ if (fileChildren.size() != readerChildren.size()) {
+ hasConversion = true;
+ }
int jointSize = Math.min(fileChildren.size(), readerChildren.size());
for(int i=0; i < jointSize; ++i) {
- buildMapping(fileChildren.get(i), readerChildren.get(i));
+ buildConversionFileTypesArray(fileChildren.get(i),
+ readerChildren.get(i));
}
break;
}
@@ -139,15 +167,48 @@ public class SchemaEvolution {
*/
isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType);
+ hasConversion = true;
}
if (isOk) {
- readerToFile.put(readerType.getId(), fileType);
+ int id = readerType.getId();
+ if (readerFileTypes[id] != null) {
+ throw new RuntimeException("reader to file type entry already" +
+ " assigned");
+ }
+ readerFileTypes[id] = fileType;
} else {
- throw new IOException(
- String.format(
- "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)",
- fileType.toString(), fileType.getId(),
- readerType.toString(), readerType.getId()));
+ throw new IOException(String.format("ORC does not support type" +
+ " conversion from file type %s" +
+ " (%d) to reader type %s (%d)",
+ fileType.toString(),
+ fileType.getId(),
+ readerType.toString(),
+ readerType.getId()));
+ }
+ }
+
+ /**
+ * Use to make a reader to file type array when the schema is the same.
+ * @return
+ */
+ private void buildSameSchemaFileTypesArray() {
+ buildSameSchemaFileTypesArrayRecurse(readerSchema);
+ }
+
+ void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) {
+ if (included != null && !included[readerType.getId()]) {
+ return;
+ }
+ int id = readerType.getId();
+ if (readerFileTypes[id] != null) {
+ throw new RuntimeException("reader to file type entry already assigned");
+ }
+ readerFileTypes[id] = readerType;
+ List<TypeDescription> children = readerType.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ buildSameSchemaFileTypesArrayRecurse(child);
+ }
}
}
@@ -176,7 +237,8 @@ public class SchemaEvolution {
return result;
}
- public static final List<String> acidEventFieldNames= new ArrayList<String>();
+ public static final List<String> acidEventFieldNames=
+ new ArrayList<String>();
static {
acidEventFieldNames.add("operation");
acidEventFieldNames.add("originalTransaction");