You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/03/04 11:27:43 UTC
[hive] branch master updated: HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8bded40 HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)
8bded40 is described below
commit 8bded407ec3bf782db16f569da7ddc2c5a235628
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Fri Mar 4 12:27:26 2022 +0100
HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)
---
.../iceberg/mr/hive/GenericUDFIcebergBucket.java | 201 +++++++++++
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 7 +
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 64 +++-
.../iceberg/mr/hive/HiveIcebergTestUtils.java | 4 +-
.../mr/hive/TestHiveIcebergOutputCommitter.java | 18 +-
.../queries/positive/dynamic_partition_writes.q | 46 ++-
.../positive/dynamic_partition_writes.q.out | 366 +++++++++++++++++++--
iceberg/patched-iceberg-core/pom.xml | 3 +-
.../org/apache/iceberg/io/ClusteredWriter.java | 158 ---------
.../hadoop/hive/ql/exec/FunctionRegistry.java | 7 +
.../hive/ql/metadata/HiveStorageHandler.java | 18 +
.../ql/optimizer/SortedDynPartitionOptimizer.java | 163 ++++++---
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 10 +-
.../hadoop/hive/ql/plan/DynamicPartitionCtx.java | 21 ++
14 files changed, 822 insertions(+), 264 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java
new file mode 100644
index 0000000..cab4bb1
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorConverter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * GenericUDFIcebergBucket - UDF that wraps around Iceberg's bucket transform function
+ */
+@Description(name = "iceberg_bucket",
+ value = "_FUNC_(value, bucketCount) - " +
+ "Returns the bucket value calculated by Iceberg bucket transform function ",
+ extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4")
+public class GenericUDFIcebergBucket extends GenericUDF {
+ private final IntWritable result = new IntWritable();
+ private int numBuckets = -1;
+ private transient PrimitiveObjectInspector argumentOI;
+ private transient ObjectInspectorConverters.Converter converter;
+
+ @FunctionalInterface
+ private interface UDFEvalFunction<T> {
+ void apply(T argument) throws HiveException;
+ }
+
+ private transient UDFEvalFunction<DeferredObject> evaluator;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ if (arguments.length != 2) {
+ throw new UDFArgumentLengthException(
+ "ICEBERG_BUCKET requires 2 arguments (value, bucketCount), but got " + arguments.length);
+ }
+
+ numBuckets = getNumBuckets(arguments[1]);
+
+ if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentException(
+ "ICEBERG_BUCKET first argument takes primitive types, got " + argumentOI.getTypeName());
+ }
+ argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+ PrimitiveObjectInspector.PrimitiveCategory inputType = argumentOI.getPrimitiveCategory();
+ ObjectInspector outputOI = null;
+ switch (inputType) {
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ converter = new PrimitiveObjectInspectorConverter.StringConverter(argumentOI);
+ Transform<String, Integer> stringTransform = Transforms.bucket(Types.StringType.get(), numBuckets);
+ evaluator = arg -> {
+ String val = (String) converter.convert(arg.get());
+ result.set(stringTransform.apply(val));
+ };
+ break;
+
+ case BINARY:
+ converter = new PrimitiveObjectInspectorConverter.BinaryConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableBinaryObjectInspector);
+ Transform<ByteBuffer, Integer> byteBufferTransform = Transforms.bucket(Types.BinaryType.get(), numBuckets);
+ evaluator = arg -> {
+ BytesWritable val = (BytesWritable) converter.convert(arg.get());
+ ByteBuffer byteBuffer = ByteBuffer.wrap(val.getBytes(), 0, val.getLength());
+ result.set(byteBufferTransform.apply(byteBuffer));
+ };
+ break;
+
+ case INT:
+ converter = new PrimitiveObjectInspectorConverter.IntConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+ Transform<Integer, Integer> intTransform = Transforms.bucket(Types.IntegerType.get(), numBuckets);
+ evaluator = arg -> {
+ IntWritable val = (IntWritable) converter.convert(arg.get());
+ result.set(intTransform.apply(val.get()));
+ };
+ break;
+
+ case LONG:
+ converter = new PrimitiveObjectInspectorConverter.LongConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ Transform<Long, Integer> longTransform = Transforms.bucket(Types.LongType.get(), numBuckets);
+ evaluator = arg -> {
+ LongWritable val = (LongWritable) converter.convert(arg.get());
+ result.set(longTransform.apply(val.get()));
+ };
+ break;
+
+ case DECIMAL:
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(argumentOI);
+ Type.PrimitiveType decimalIcebergType = Types.DecimalType.of(decimalTypeInfo.getPrecision(),
+ decimalTypeInfo.getScale());
+
+ converter = new PrimitiveObjectInspectorConverter.HiveDecimalConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector);
+ Transform<BigDecimal, Integer> bigDecimalTransform = Transforms.bucket(decimalIcebergType, numBuckets);
+ evaluator = arg -> {
+ HiveDecimalWritable val = (HiveDecimalWritable) converter.convert(arg.get());
+ result.set(bigDecimalTransform.apply(val.getHiveDecimal().bigDecimalValue()));
+ };
+ break;
+
+ case FLOAT:
+ converter = new PrimitiveObjectInspectorConverter.FloatConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
+ Transform<Float, Integer> floatTransform = Transforms.bucket(Types.FloatType.get(), numBuckets);
+ evaluator = arg -> {
+ FloatWritable val = (FloatWritable) converter.convert(arg.get());
+ result.set(floatTransform.apply(val.get()));
+ };
+ break;
+
+ case DOUBLE:
+ converter = new PrimitiveObjectInspectorConverter.DoubleConverter(argumentOI,
+ PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ Transform<Double, Integer> doubleTransform = Transforms.bucket(Types.DoubleType.get(), numBuckets);
+ evaluator = arg -> {
+ DoubleWritable val = (DoubleWritable) converter.convert(arg.get());
+ result.set(doubleTransform.apply(val.get()));
+ };
+ break;
+
+ default:
+ throw new UDFArgumentException(
+ " ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE" +
+ " types as first argument, got " + inputType);
+ }
+
+ outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ return outputOI;
+ }
+
+ private static int getNumBuckets(ObjectInspector arg) throws UDFArgumentException {
+ UDFArgumentException udfArgumentException = new UDFArgumentException("ICEBERG_BUCKET() second argument can only " +
+ "take an int type, but got " + arg.getTypeName());
+ if (arg.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw udfArgumentException;
+ }
+ PrimitiveObjectInspector.PrimitiveCategory inputType = ((PrimitiveObjectInspector) arg).getPrimitiveCategory();
+ if (inputType != PrimitiveObjectInspector.PrimitiveCategory.INT) {
+ throw udfArgumentException;
+ }
+ return ((WritableConstantIntObjectInspector) arg).getWritableConstantValue().get();
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+
+ DeferredObject argument = arguments[0];
+ if (argument == null) {
+ return null;
+ } else {
+ evaluator.apply(argument);
+ }
+
+ return result;
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("iceberg_bucket", children);
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index dc799cc..1ba824e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
@@ -146,6 +147,12 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
}
+ // Currently ClusteredWriter is used which requires that records are ordered by partition keys.
+ // Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
+ // TODO: remove once we have both Fanout and ClusteredWriter available: HIVE-25948
+ HiveConf.setIntVar(configuration, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
+ HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+
try {
this.inspector = IcebergObjectInspector.create(projectedSchema);
} catch (Exception e) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 2dd92a3..a6777d3 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +30,8 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -63,6 +66,7 @@ import org.apache.hadoop.hive.ql.stats.Partish;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
@@ -85,7 +89,9 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +102,25 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
private static final String ICEBERG_URI_PREFIX = "iceberg://";
private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
private static final String TABLE_NAME_SEPARATOR = "..";
+ /**
+ * Function template for producing a custom sort expression function:
+ * Takes the source column index and the bucket count to creat a function where Iceberg bucket UDF is used to build
+ * the sort expression, e.g. iceberg_bucket(_col2, 5)
+ */
+ private static final transient BiFunction<Integer, Integer, Function<List<ExprNodeDesc>, ExprNodeDesc>>
+ BUCKET_SORT_EXPR =
+ (idx, bucket) -> cols -> {
+ try {
+ ExprNodeDesc icebergBucketSourceCol = cols.get(idx);
+ return ExprNodeGenericFuncDesc.newInstance(new GenericUDFIcebergBucket(), "iceberg_bucket",
+ Lists.newArrayList(
+ icebergBucketSourceCol,
+ new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, bucket)
+ ));
+ } catch (UDFArgumentException e) {
+ throw new RuntimeException(e);
+ }
+ };
static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
@@ -284,7 +309,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
@Override
public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
- List<PartitionTransformSpec> result = new ArrayList<>();
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return table.spec().fields().stream().map(f -> {
@@ -308,6 +332,42 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
@Override
+ public DynamicPartitionCtx createDPContext(HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable)
+ throws SemanticException {
+ TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+ Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+ if (table.spec().isUnpartitioned()) {
+ return null;
+ }
+
+ // Iceberg currently doesn't have publicly accessible partition transform information, hence use above string parse
+ List<PartitionTransformSpec> partitionTransformSpecs = getPartitionTransformSpec(hmsTable);
+
+ DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
+ hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+ hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
+ dpCtx.setCustomSortExpressions(customSortExprs);
+
+ Map<String, Integer> fieldOrderMap = Maps.newHashMap();
+ List<Types.NestedField> fields = table.schema().columns();
+ for (int i = 0; i < fields.size(); ++i) {
+ fieldOrderMap.put(fields.get(i).name(), i);
+ }
+
+ for (PartitionTransformSpec spec : partitionTransformSpecs) {
+ int order = fieldOrderMap.get(spec.getColumnName());
+ if (PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
+ customSortExprs.add(BUCKET_SORT_EXPR.apply(order, spec.getTransformParam().get()));
+ } else {
+ customSortExprs.add(cols -> cols.get(order).clone());
+ }
+ }
+
+ return dpCtx;
+ }
+
+ @Override
public String getFileFormatPropertyKey() {
return TableProperties.DEFAULT_FILE_FORMAT;
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index b50bf4e..03784ca 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -253,8 +253,8 @@ public class HiveIcebergTestUtils {
List<Record> sortedExpected = new ArrayList<>(expected);
List<Record> sortedActual = new ArrayList<>(actual);
// Sort based on the specified column
- sortedExpected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
- sortedActual.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
+ sortedExpected.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+ sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
Assert.assertEquals(sortedExpected.size(), sortedActual.size());
for (int i = 0; i < sortedExpected.size(); ++i) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 24c43b3..2e3f5aa 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -77,7 +77,7 @@ public class TestHiveIcebergOutputCommitter {
);
private static final PartitionSpec PARTITIONED_SPEC =
- PartitionSpec.builderFor(CUSTOMER_SCHEMA).bucket("customer_id", 3).build();
+ PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -123,8 +123,7 @@ public class TestHiveIcebergOutputCommitter {
List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
- // Expecting 3 files with fanout-, 4 with ClusteredWriter where writing to already completed partitions is allowed.
- HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
+ HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
HiveIcebergTestUtils.validateData(table, expected, 0);
}
@@ -137,7 +136,7 @@ public class TestHiveIcebergOutputCommitter {
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
- HiveIcebergTestUtils.validateData(table, expected, 0);
+ HiveIcebergTestUtils.validateData(table, expected, 1);
}
@Test
@@ -148,9 +147,8 @@ public class TestHiveIcebergOutputCommitter {
List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
- // Expecting 6 files with fanout-, 8 with ClusteredWriter where writing to already completed partitions is allowed.
- HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 8);
- HiveIcebergTestUtils.validateData(table, expected, 0);
+ HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
+ HiveIcebergTestUtils.validateData(table, expected, 1);
}
@Test
@@ -174,7 +172,7 @@ public class TestHiveIcebergOutputCommitter {
List<Record> expected = writeRecords(table.name(), 2, 2, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
- HiveIcebergTestUtils.validateData(table, expected, 0);
+ HiveIcebergTestUtils.validateData(table, expected, 1);
}
@Test
@@ -270,6 +268,10 @@ public class TestHiveIcebergOutputCommitter {
for (int i = 0; i < taskNum; ++i) {
List<Record> records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum);
+ // making customer_id deterministic for result comparisons
+ for (int j = 0; j < RECORD_NUM; ++j) {
+ records.get(j).setField("customer_id", j / 3L);
+ }
TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum);
int partitionId = taskId.getTaskID().getId();
String operationId = QUERY_ID + "-" + JOB_ID;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
index 8ea1f12..9309452 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
@@ -1,21 +1,45 @@
+-- Mask the file size values as it can have slight variability, causing test flakiness
+--! qt:replace:/("file_size_in_bytes":)\d+/$1#Masked#/
+--! qt:replace:/("total-files-size":)\d+/$1#Masked#/
+--! qt:replace:/((ORC|PARQUET|AVRO)\s+\d+\s+)\d+/$1#Masked#/
+
drop table if exists tbl_src;
drop table if exists tbl_target_identity;
drop table if exists tbl_target_bucket;
+drop table if exists tbl_target_mixed;
-create external table tbl_src (a int, b string) stored by iceberg stored as orc;
-insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK');
+create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc;
+insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5);
--need at least 2 files to ensure ClusteredWriter encounters out-of-order records
-insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK');
+insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12);
create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc;
-explain insert overwrite table tbl_target_identity select * from tbl_src;
-insert overwrite table tbl_target_identity select * from tbl_src;
-select * from tbl_target_identity order by a;
+explain insert overwrite table tbl_target_identity select a, b from tbl_src;
+insert overwrite table tbl_target_identity select a, b from tbl_src;
+select * from tbl_target_identity order by a, ccy;
---bucketed case - although SortedDynPartitionOptimizer kicks in for this case too, its work is futile as it sorts values rather than the computed buckets
---thus we need this case to check that ClusteredWriter allows out-of-order records for bucket partition spec (only)
+--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
-explain insert into table tbl_target_bucket select * from tbl_src;
-insert into table tbl_target_bucket select * from tbl_src;
-select * from tbl_target_bucket order by a;
\ No newline at end of file
+explain insert into table tbl_target_bucket select a, b from tbl_src;
+insert into table tbl_target_bucket select a, b from tbl_src;
+select * from tbl_target_bucket order by a, ccy;
+
+--mixed case - 1 identity + 1 bucket cols
+create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc;
+explain insert into table tbl_target_mixed select * from tbl_src;
+insert into table tbl_target_mixed select * from tbl_src;
+select * from tbl_target_mixed order by a, ccy;
+select * from default.tbl_target_mixed.partitions;
+select * from default.tbl_target_mixed.files;
+
+--1 of 2 partition cols is folded with constant - should still sort
+explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';
+insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';
+
+--all partitions cols folded - should not sort as it's not needed
+explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;
+insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;
+
+select * from tbl_target_mixed order by a, ccy;
+select * from default.tbl_target_mixed.files;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
index 656e5ba..91b3808 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
@@ -10,27 +10,31 @@ PREHOOK: query: drop table if exists tbl_target_bucket
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_target_bucket
POSTHOOK: type: DROPTABLE
-PREHOOK: query: create external table tbl_src (a int, b string) stored by iceberg stored as orc
+PREHOOK: query: drop table if exists tbl_target_mixed
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_target_mixed
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl_src
-POSTHOOK: query: create external table tbl_src (a int, b string) stored by iceberg stored as orc
+POSTHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_src
-PREHOOK: query: insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK')
+PREHOOK: query: insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_src
-POSTHOOK: query: insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK')
+POSTHOOK: query: insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_src
-PREHOOK: query: insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK')
+PREHOOK: query: insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_src
-POSTHOOK: query: insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK')
+POSTHOOK: query: insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_src
@@ -42,11 +46,11 @@ POSTHOOK: query: create external table tbl_target_identity (a int) partitioned b
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_target_identity
-PREHOOK: query: explain insert overwrite table tbl_target_identity select * from tbl_src
+PREHOOK: query: explain insert overwrite table tbl_target_identity select a, b from tbl_src
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_src
PREHOOK: Output: default@tbl_target_identity
-POSTHOOK: query: explain insert overwrite table tbl_target_identity select * from tbl_src
+POSTHOOK: query: explain insert overwrite table tbl_target_identity select a, b from tbl_src
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_src
POSTHOOK: Output: default@tbl_target_identity
@@ -68,10 +72,9 @@ Stage-3
File Output Operator [FS_18]
table:{"name:":"default.tbl_target_identity"}
Select Operator [SEL_17]
- Output:["_col0","_col1"]
+ Output:["_col0","_col1","_col1"]
<-Map 1 [SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_13]
- PartitionCols:_col1
Select Operator [SEL_12] (rows=20 width=91)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=20 width=91)
@@ -90,19 +93,19 @@ Stage-3
Output:["a","ccy"]
Please refer to the previous Select Operator [SEL_12]
-PREHOOK: query: insert overwrite table tbl_target_identity select * from tbl_src
+PREHOOK: query: insert overwrite table tbl_target_identity select a, b from tbl_src
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_src
PREHOOK: Output: default@tbl_target_identity
-POSTHOOK: query: insert overwrite table tbl_target_identity select * from tbl_src
+POSTHOOK: query: insert overwrite table tbl_target_identity select a, b from tbl_src
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_src
POSTHOOK: Output: default@tbl_target_identity
-PREHOOK: query: select * from tbl_target_identity order by a
+PREHOOK: query: select * from tbl_target_identity order by a, ccy
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_target_identity
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_target_identity order by a
+POSTHOOK: query: select * from tbl_target_identity order by a, ccy
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_target_identity
POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -115,8 +118,8 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
7 USD
8 PLN
9 PLN
-10 EUR
10 CZK
+10 EUR
20 EUR
30 USD
40 EUR
@@ -134,11 +137,11 @@ POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) par
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_target_bucket
-PREHOOK: query: explain insert into table tbl_target_bucket select * from tbl_src
+PREHOOK: query: explain insert into table tbl_target_bucket select a, b from tbl_src
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_src
PREHOOK: Output: default@tbl_target_bucket
-POSTHOOK: query: explain insert into table tbl_target_bucket select * from tbl_src
+POSTHOOK: query: explain insert into table tbl_target_bucket select a, b from tbl_src
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_src
POSTHOOK: Output: default@tbl_target_bucket
@@ -160,10 +163,9 @@ Stage-3
File Output Operator [FS_18]
table:{"name:":"default.tbl_target_bucket"}
Select Operator [SEL_17]
- Output:["_col0","_col1"]
+ Output:["_col0","_col1","iceberg_bucket(_col1, 2)"]
<-Map 1 [SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_13]
- PartitionCols:_col1
Select Operator [SEL_12] (rows=20 width=91)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=20 width=91)
@@ -182,19 +184,19 @@ Stage-3
Output:["a","ccy"]
Please refer to the previous Select Operator [SEL_12]
-PREHOOK: query: insert into table tbl_target_bucket select * from tbl_src
+PREHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_src
PREHOOK: Output: default@tbl_target_bucket
-POSTHOOK: query: insert into table tbl_target_bucket select * from tbl_src
+POSTHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_src
POSTHOOK: Output: default@tbl_target_bucket
-PREHOOK: query: select * from tbl_target_bucket order by a
+PREHOOK: query: select * from tbl_target_bucket order by a, ccy
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_target_bucket
PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_target_bucket order by a
+POSTHOOK: query: select * from tbl_target_bucket order by a, ccy
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_target_bucket
POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -207,8 +209,8 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
7 USD
8 PLN
9 PLN
-10 EUR
10 CZK
+10 EUR
20 EUR
30 USD
40 EUR
@@ -218,3 +220,319 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
80 PLN
90 PLN
100 CZK
+PREHOOK: query: create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE)
+Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+
+Stage-3
+ Stats Work{}
+ Stage-0
+ Move Operator
+ table:{"name:":"default.tbl_target_mixed"}
+ Stage-2
+ Dependency Collection{}
+ Stage-1
+ Reducer 2 vectorized
+ File Output Operator [FS_18]
+ table:{"name:":"default.tbl_target_mixed"}
+ Select Operator [SEL_17]
+ Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"]
+ <-Map 1 [SIMPLE_EDGE] vectorized
+ PARTITION_ONLY_SHUFFLE [RS_13]
+ Select Operator [SEL_12] (rows=20 width=99)
+ Output:["_col0","_col1","_col2"]
+ TableScan [TS_0] (rows=20 width=99)
+ default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
+ Reducer 3 vectorized
+ File Output Operator [FS_21]
+ Select Operator [SEL_20] (rows=1 width=794)
+ Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17"]
+ Group By Operator [GBY_19] (rows=1 width=500)
+ Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"]
+ <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized
+ PARTITION_ONLY_SHUFFLE [RS_16]
+ Group By Operator [GBY_15] (rows=1 width=568)
+ Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"]
+ Select Operator [SEL_14] (rows=20 width=99)
+ Output:["a","ccy","c"]
+ Please refer to the previous Select Operator [SEL_12]
+
+PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: insert into table tbl_target_mixed select * from tbl_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: select * from tbl_target_mixed order by a, ccy
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from tbl_target_mixed order by a, ccy
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 EUR 10
+2 EUR 10
+3 USD 11
+4 EUR 12
+5 HUF 30
+6 USD 10
+7 USD 100
+8 PLN 20
+9 PLN 11
+10 CZK 5
+10 EUR 12
+20 EUR 11
+30 USD 100
+40 EUR 10
+50 HUF 30
+60 USD 12
+70 USD 20
+80 PLN 100
+90 PLN 18
+100 CZK 12
+PREHOOK: query: select * from default.tbl_target_mixed.partitions
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.tbl_target_mixed.partitions
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+{"ccy":"EUR","c_bucket":0} 1 1
+{"ccy":"EUR","c_bucket":1} 2 1
+{"ccy":"HUF","c_bucket":1} 2 1
+{"ccy":"EUR","c_bucket":2} 3 1
+{"ccy":"USD","c_bucket":1} 3 1
+{"ccy":"CZK","c_bucket":1} 1 1
+{"ccy":"USD","c_bucket":0} 2 1
+{"ccy":"USD","c_bucket":2} 1 1
+{"ccy":"CZK","c_bucket":2} 1 1
+{"ccy":"PLN","c_bucket":2} 1 1
+{"ccy":"PLN","c_bucket":0} 2 1
+{"ccy":"PLN","c_bucket":1} 1 1
+PREHOOK: query: select * from default.tbl_target_mixed.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.tbl_target_mixed.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"CZK","c_bucket":1} 1 449 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:d ,2:CZK,3: } {1:d ,2:CZK,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"CZK","c_bucket":2} 1 432 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:
+ ,2:CZK,3: } {1:
+ ,2:CZK,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"EUR","c_bucket":0} 1 432 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1: ,2:EUR,3: } {1: ,2:EUR,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"EUR","c_bucket":1} 2 448 {1:7,2:18,3:7} {1:2,2:2,3:2} {1:0,2:0,3:0} {} {1: ,2:EUR,3: } {1:
+ ,2:EUR,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"EUR","c_bucket":2} 3 448 {1:8,2:17,3:5} {1:3,2:3,3:3} {1:0,2:0,3:0} {} {1: ,2:EUR,3:
+ } {1:( ,2:EUR,3:
+ } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"HUF","c_bucket":1} 2 448 {1:7,2:18,3:7} {1:2,2:2,3:2} {1:0,2:0,3:0} {} {1: ,2:HUF,3: } {1:2 ,2:HUF,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"PLN","c_bucket":0} 2 448 {1:7,2:18,3:7} {1:2,2:2,3:2} {1:0,2:0,3:0} {} {1: ,2:PLN,3: } {1: ,2:PLN,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"PLN","c_bucket":1} 1 448 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:P ,2:PLN,3:d } {1:P ,2:PLN,3:d } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"PLN","c_bucket":2} 1 449 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:Z ,2:PLN,3: } {1:Z ,2:PLN,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"USD","c_bucket":0} 2 462 {1:7,2:18,3:7} {1:2,2:2,3:2} {1:0,2:0,3:0} {} {1: ,2:USD,3: } {1:F ,2:USD,3: } NULL [3] NULL 0
+0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"USD","c_bucket":1} 3 470 {1:8,2:17,3:8} {1:3,2:3,3:3} {1:0,2:0,3:0} {} {1: ,2:USD,3: } {1:<