You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/08/09 19:44:02 UTC
hive git commit: HIVE-7239 Fix bug in HiveIndexedInputFormat
implementation that causes incorrect query result when input backed by
Sequence/RC files (Illya Yalovyy via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master ca807c63c -> 2ef3ab855
HIVE-7239 Fix bug in HiveIndexedInputFormat implementation that causes incorrect query result when input backed by Sequence/RC files (Illya Yalovyy via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2ef3ab85
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2ef3ab85
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2ef3ab85
Branch: refs/heads/master
Commit: 2ef3ab855c725829cabf2c93b9fc9e31553ca643
Parents: ca807c6
Author: Illya Yalovyy <ya...@amazon.com>
Authored: Tue Jul 26 22:09:00 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Aug 9 12:43:39 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/index/HiveIndexResult.java | 3 +-
.../hive/ql/index/HiveIndexedInputFormat.java | 43 +--
.../hadoop/hive/ql/index/IndexResult.java | 25 ++
.../hadoop/hive/ql/index/SplitFilter.java | 125 ++++++++
.../hive/ql/index/MockHiveInputSplits.java | 37 +++
.../hadoop/hive/ql/index/MockIndexResult.java | 38 +++
.../hadoop/hive/ql/index/MockInputFile.java | 130 ++++++++
.../hive/ql/index/SplitFilterTestCase.java | 153 ++++++++++
.../ql/index/TestHiveInputSplitComparator.java | 64 ++++
.../hadoop/hive/ql/index/TestSplitFilter.java | 296 +++++++++++++++++++
10 files changed, 874 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
index 33cc5c3..586cd68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.mapred.LineRecordReader.LineReader;
* HiveIndexResult parses the input stream from an index query
* to generate a list of file splits to query.
*/
-public class HiveIndexResult {
+public class HiveIndexResult implements IndexResult {
public static final Logger l4j =
LoggerFactory.getLogger(HiveIndexResult.class.getSimpleName());
@@ -182,6 +182,7 @@ public class HiveIndexResult {
bucket.getOffsets().add(Long.parseLong(one_offset));
}
+ @Override
public boolean contains(FileSplit split) throws HiveException {
if (buckets == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
index 5247ece..0e6ec84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Iterator;
import java.util.Set;
import java.util.Arrays;
+import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
@@ -144,49 +144,14 @@ public class HiveIndexedInputFormat extends HiveInputFormat {
HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits);
- ArrayList<HiveInputSplit> newSplits = new ArrayList<HiveInputSplit>(
- numSplits);
-
long maxInputSize = HiveConf.getLongVar(job, ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE);
if (maxInputSize < 0) {
maxInputSize=Long.MAX_VALUE;
}
- long sumSplitLengths = 0;
- for (HiveInputSplit split : splits) {
- l4j.info("split start : " + split.getStart());
- l4j.info("split end : " + (split.getStart() + split.getLength()));
+ SplitFilter filter = new SplitFilter(hiveIndexResult, maxInputSize);
+ Collection<HiveInputSplit> newSplits = filter.filter(splits);
- try {
- if (hiveIndexResult.contains(split)) {
- // we may miss a sync here
- HiveInputSplit newSplit = split;
- if (split.inputFormatClassName().contains("RCFile")
- || split.inputFormatClassName().contains("SequenceFile")) {
- if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
- newSplit = new HiveInputSplit(new FileSplit(split.getPath(),
- split.getStart() - SequenceFile.SYNC_INTERVAL,
- split.getLength() + SequenceFile.SYNC_INTERVAL,
- split.getLocations()),
- split.inputFormatClassName());
- }
- }
- sumSplitLengths += newSplit.getLength();
- if (sumSplitLengths > maxInputSize) {
- throw new IOException(
- "Size of data to read during a compact-index-based query exceeded the maximum of "
- + maxInputSize + " set in " + ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname);
- }
- newSplits.add(newSplit);
- }
- } catch (HiveException e) {
- throw new RuntimeException(
- "Unable to get metadata for input table split" + split.getPath(), e);
- }
- }
- InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
- l4j.info("Number of input splits: " + splits.length + " new input splits: "
- + retA.length + ", sum of split lengths: " + sumSplitLengths);
- return retA;
+ return newSplits.toArray(new FileSplit[newSplits.size()]);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java b/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java
new file mode 100644
index 0000000..44e4f76
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.mapred.FileSplit;
+
+public interface IndexResult {
+ boolean contains(FileSplit split) throws HiveException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java
new file mode 100644
index 0000000..8b339ec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SplitFilter {
+ public static final Logger LOG = LoggerFactory.getLogger(SplitFilter.class);
+
+ private final IndexResult indexResult;
+ private final long maxInputSize;
+
+ public SplitFilter(IndexResult indexResult, long maxInputSize) {
+ this.indexResult = indexResult;
+ this.maxInputSize = maxInputSize;
+ }
+
+ public List<HiveInputSplit> filter(HiveInputSplit[] splits) throws IOException {
+ long sumSplitLengths = 0;
+ List<HiveInputSplit> newSplits = new ArrayList<>();
+
+ Arrays.sort(splits, new HiveInputSplitComparator());
+
+ for (HiveInputSplit split : splits) {
+ LOG.info("split start : " + split.getStart());
+ LOG.info("split end : " + (split.getStart() + split.getLength()));
+
+ try {
+ if (indexResult.contains(split)) {
+ HiveInputSplit newSplit = split;
+ if (isAdjustmentRequired(newSplits, split)) {
+ newSplit = adjustSplit(split);
+ }
+ sumSplitLengths += newSplit.getLength();
+ if (sumSplitLengths > maxInputSize) {
+ String messageTemplate = "Size of data to read during a compact-index-based query " +
+ "exceeded the maximum of %d set in %s";
+ throw new IOException(String.format(messageTemplate, maxInputSize,
+ HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname));
+ }
+ newSplits.add(newSplit);
+ }
+ } catch (HiveException e) {
+ throw new RuntimeException("Unable to get metadata for input table split " +
+ split.getPath(), e);
+ }
+ }
+ LOG.info("Number of input splits: {}, new input splits: {}, sum of split lengths: {}",
+ splits.length, newSplits.size(), sumSplitLengths);
+ return newSplits;
+ }
+
+ private boolean isAdjustmentRequired(List<HiveInputSplit> newSplits, HiveInputSplit split) {
+ return (split.inputFormatClassName().contains("RCFile") ||
+ split.inputFormatClassName().contains("SequenceFile")) && split.getStart() > 0 &&
+ !doesOverlap(newSplits, split.getPath(), adjustStart(split.getStart()));
+ }
+
+ private boolean doesOverlap(List<HiveInputSplit> newSplits, Path path, long start) {
+ if (newSplits.isEmpty()) {
+ return false;
+ }
+ HiveInputSplit lastSplit = Iterables.getLast(newSplits);
+ if (lastSplit.getPath().equals(path)) {
+ return lastSplit.getStart() + lastSplit.getLength() > start;
+ }
+ return false;
+ }
+
+ private long adjustStart(long start) {
+ return start > SequenceFile.SYNC_INTERVAL ? start - SequenceFile.SYNC_INTERVAL : 0;
+ }
+
+ private HiveInputSplit adjustSplit(HiveInputSplit split) throws IOException {
+ long adjustedStart = adjustStart(split.getStart());
+ return new HiveInputSplit(new FileSplit(split.getPath(), adjustedStart,
+ split.getStart() - adjustedStart + split.getLength(), split.getLocations()),
+ split.inputFormatClassName());
+ }
+
+ @VisibleForTesting
+ static final class HiveInputSplitComparator implements Comparator<HiveInputSplit> {
+ @Override
+ public int compare(HiveInputSplit o1, HiveInputSplit o2) {
+ int pathCompare = comparePath(o1.getPath(), o2.getPath());
+ if (pathCompare != 0) {
+ return pathCompare;
+ }
+ return Long.compare(o1.getStart(), o2.getStart());
+ }
+
+ private int comparePath(Path p1, Path p2) {
+ return p1.compareTo(p2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java
new file mode 100644
index 0000000..7815ed0
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+public final class MockHiveInputSplits {
+ private static final String[] HOSTS = {};
+ private static final String INPUT_FORMAT_CLASS_NAME = SequenceFileInputFormat.class.getCanonicalName();
+
+ private MockHiveInputSplits() {
+ }
+
+ public static HiveInputSplit createMockSplit(String pathString, long start, long length) {
+ InputSplit inputSplit = new FileSplit(new Path(pathString), start, length, HOSTS);
+ return new HiveInputSplit(inputSplit, INPUT_FORMAT_CLASS_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java
new file mode 100644
index 0000000..95d069c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.mapred.FileSplit;
+
+public final class MockIndexResult implements IndexResult {
+
+ private final ImmutableSet<HiveInputSplit> selectedSplits;
+
+ public MockIndexResult(Collection<HiveInputSplit> selectedSplits) {
+ this.selectedSplits = ImmutableSet.copyOf(selectedSplits);
+ }
+
+ @Override
+ public boolean contains(FileSplit split) throws HiveException {
+ return selectedSplits.contains(split);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java
new file mode 100644
index 0000000..4619b6c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+
+public final class MockInputFile {
+ private final String path;
+ private final ImmutableList<HiveInputSplit> splits;
+ private final ImmutableList<HiveInputSplit> selectedSplits;
+
+ private MockInputFile(String path, List<HiveInputSplit> splits,
+ List<HiveInputSplit> selectedSplits) {
+ this.path = path;
+ this.splits = ImmutableList.copyOf(splits);
+ this.selectedSplits = ImmutableList.copyOf(selectedSplits);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public List<HiveInputSplit> getSplits() {
+ return splits;
+ }
+
+ public List<HiveInputSplit> getSelectedSplits() {
+ return selectedSplits;
+ }
+
+ public static PathStep builder() {
+ return new MockInputFileBuilder();
+ }
+
+ public static interface PathStep {
+ DefaultSplitLengthStep path(String path);
+ }
+
+ public static interface DefaultSplitLengthStep extends SplitStep {
+ SplitStep defaultSplitLength(long defaultSplitLength);
+ }
+
+ public static interface SplitStep {
+ SplitStep split();
+ SplitStep selectedSplit();
+ LastSplitStep split(long lastSplitSize);
+ LastSplitStep selectedSplit(long lastSplitSize);
+ MockInputFile build();
+ }
+
+ public static interface LastSplitStep {
+ MockInputFile build();
+ }
+
+ private static final class MockInputFileBuilder implements PathStep, SplitStep, LastSplitStep,
+ DefaultSplitLengthStep {
+
+ private String path;
+ private long defaultSplitSize = SplitFilterTestCase.DEFAULT_SPLIT_SIZE;;
+ private final List<HiveInputSplit> splits = new ArrayList<>();
+ private final List<HiveInputSplit> selectedSplits = new ArrayList<>();
+ private long position = 0;
+
+ @Override
+ public DefaultSplitLengthStep path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ @Override
+ public SplitStep split() {
+ nextSplit(defaultSplitSize);
+ return this;
+ }
+
+ @Override
+ public LastSplitStep split(long lastSplitSize) {
+ nextSplit(lastSplitSize);
+ return this;
+ }
+
+ @Override
+ public SplitStep selectedSplit() {
+ selectedSplits.add(nextSplit(defaultSplitSize));
+ return this;
+ }
+
+ @Override
+ public LastSplitStep selectedSplit(long lastSplitSize) {
+ selectedSplits.add(nextSplit(lastSplitSize));
+ return this;
+ }
+
+ @Override
+ public SplitStep defaultSplitLength(long defaultSplitLength) {
+ this.defaultSplitSize = defaultSplitLength;
+ return this;
+ }
+
+ private HiveInputSplit nextSplit(long splitSize) {
+ HiveInputSplit split = MockHiveInputSplits.createMockSplit(path, position, splitSize);
+ position += splitSize;
+ splits.add(split);
+ return split;
+ }
+
+ @Override
+ public MockInputFile build() {
+ return new MockInputFile(path, splits, selectedSplits);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java b/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java
new file mode 100644
index 0000000..394dc74
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public final class SplitFilterTestCase {
+ public static final long DEFAULT_SPLIT_SIZE = 1024 * 1024;
+ public static final long SMALL_SPLIT_SIZE = 500;
+
+ private final Set<HiveInputSplit> allSplits;
+ private final Set<HiveInputSplit> selectedSplits;
+ private final Set<HiveInputSplit> expectedSplits;
+ private final long maxInputSize;
+
+ private SplitFilterTestCase(Iterable<HiveInputSplit> allSplits,
+ Iterable<HiveInputSplit> selectedSplits, Iterable<HiveInputSplit> expectedSplits,
+ long maxInputSize) {
+
+ this.allSplits = ImmutableSet.copyOf(allSplits);
+ this.selectedSplits = ImmutableSet.copyOf(selectedSplits);
+ this.expectedSplits = ImmutableSet.copyOf(expectedSplits);
+ this.maxInputSize = maxInputSize;
+ }
+
+ private HiveInputSplit[] toArray(Collection<HiveInputSplit> splits) {
+ return splits.toArray(new HiveInputSplit[splits.size()]);
+ }
+
+ public void executeAndValidate() throws IOException {
+ SplitFilter filter = new SplitFilter(new MockIndexResult(selectedSplits), maxInputSize);
+ List<HiveInputSplit> actualSplits = filter.filter(toArray(allSplits));
+ assertSplits(expectedSplits, actualSplits);
+ }
+
+ private void assertSplits(Collection<HiveInputSplit> expectedSplits,
+ Collection<HiveInputSplit> actualSplits) {
+ SplitFilter.HiveInputSplitComparator hiveInputSplitComparator =
+ new SplitFilter.HiveInputSplitComparator();
+
+ List<HiveInputSplit> sortedExpectedSplits = new ArrayList<>(expectedSplits);
+ Collections.sort(sortedExpectedSplits, hiveInputSplitComparator);
+
+ List<HiveInputSplit> sortedActualSplits = new ArrayList<>(actualSplits);
+ Collections.sort(sortedActualSplits, hiveInputSplitComparator);
+
+ assertEquals("Number of selected splits.", sortedExpectedSplits.size(),
+ sortedActualSplits.size());
+
+ for (int i = 0; i < sortedExpectedSplits.size(); i++) {
+ HiveInputSplit expectedSplit = sortedExpectedSplits.get(i);
+ HiveInputSplit actualSplit = sortedActualSplits.get(i);
+
+ String splitName = "Split #" + i;
+
+ assertEquals(splitName + " path.", expectedSplit.getPath(), actualSplit.getPath());
+ assertEquals(splitName + " start.", expectedSplit.getStart(), actualSplit.getStart());
+ assertEquals(splitName + " length.", expectedSplit.getLength(), actualSplit.getLength());
+ }
+ }
+
+ public static MaxInputSizeStep builder() {
+ return new SplitFilterTestCaseBuilder();
+ }
+
+ public static interface MaxInputSizeStep extends InputFilesStep {
+ InputFilesStep maxInputSize(long maxInputSize);
+ }
+
+ public static interface InputFilesStep {
+ ExpectedSplitsStep inputFiles(MockInputFile... inputFiles);
+ }
+
+ public static interface ExpectedSplitsStep {
+ BuildStep expectedSplits(HiveInputSplit... expectedSplits);
+ }
+
+ public static interface BuildStep {
+ SplitFilterTestCase build();
+ }
+
+ private static final class SplitFilterTestCaseBuilder implements MaxInputSizeStep, InputFilesStep,
+ ExpectedSplitsStep, BuildStep {
+
+ private long maxInputSize = Long.MAX_VALUE;
+ private List<MockInputFile> inputFiles;
+ private List<HiveInputSplit> expectedSplits;
+
+ @Override
+ public InputFilesStep maxInputSize(long maxInputSize) {
+ this.maxInputSize = maxInputSize;
+ return this;
+ }
+
+ @Override
+ public ExpectedSplitsStep inputFiles(MockInputFile... inputFiles) {
+ this.inputFiles = Arrays.asList(inputFiles);
+ return this;
+ }
+
+ @Override
+ public BuildStep expectedSplits(HiveInputSplit... expectedSplits) {
+ this.expectedSplits = Arrays.asList(expectedSplits);
+ return this;
+ }
+
+ @Override
+ public SplitFilterTestCase build() {
+ List<HiveInputSplit> allSplits = new ArrayList<>();
+ List<HiveInputSplit> selectedSplits = new ArrayList<>();
+ Set<String> seenPaths = new HashSet<String>();
+
+ for (MockInputFile inputFile : inputFiles) {
+ if (seenPaths.add(inputFile.getPath())) {
+ allSplits.addAll(inputFile.getSplits());
+ selectedSplits.addAll(inputFile.getSelectedSplits());
+ } else {
+ fail(String.format("Cannot add 2 input files with the same path to a test case. " +
+ "The duplicated path is '%s'.", inputFile.getPath()));
+ }
+ }
+
+ return new SplitFilterTestCase(allSplits, selectedSplits, expectedSplits, maxInputSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java b/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java
new file mode 100644
index 0000000..ec8eb3a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.index.SplitFilter.HiveInputSplitComparator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.apache.hadoop.hive.ql.index.MockHiveInputSplits.createMockSplit;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestHiveInputSplitComparator {
+
+ @Parameter(0)
+ public HiveInputSplit split1;
+ @Parameter(1)
+ public HiveInputSplit split2;
+ @Parameter(2)
+ public Integer expected;
+
+ @Parameters(name = "{index}: {0}<=>{1} ")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {createMockSplit("A", 0, 100), createMockSplit("A", 1000, 100), -1},
+ {createMockSplit("A", 1000, 100), createMockSplit("A", 100, 100), 1},
+ {createMockSplit("A", 0, 100), createMockSplit("A", 0, 100), 0},
+ {createMockSplit("A", 0, 100), createMockSplit("B", 0, 100), -1},
+ {createMockSplit("A", 100, 100), createMockSplit("B", 0, 100), -1},
+ {createMockSplit("A", 100, 100), createMockSplit("B", 0, 100), -1}}
+ );
+ }
+
+ @Test
+ public void testCompare() {
+ HiveInputSplitComparator cmp = new HiveInputSplitComparator();
+ int actual = cmp.compare(split1, split2);
+ assertCompareResult(expected, actual);
+ }
+
+ private void assertCompareResult(int expected, int actual) {
+ assertEquals(expected, (int) Math.signum(actual));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java
new file mode 100644
index 0000000..f2e2c8b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import java.io.IOException;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.ql.index.MockHiveInputSplits.createMockSplit;
+import static org.apache.hadoop.io.SequenceFile.SYNC_INTERVAL;
+import static org.apache.hadoop.hive.ql.index.SplitFilterTestCase.DEFAULT_SPLIT_SIZE;
+import static org.apache.hadoop.hive.ql.index.SplitFilterTestCase.SMALL_SPLIT_SIZE;
+
+public class TestSplitFilter {
+ private SplitFilterTestCase testCase;
+
+ @Test
+ public void testOneSelectedSplitsInMiddle() throws Exception {
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .split()
+ .selectedSplit()
+ .split()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSelectedFirstSplit() throws Exception {
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .split()
+ .split()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSelectedLastSplit() throws Exception {
+ int lastSplitSize = 1234;
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .split()
+ .selectedSplit(lastSplitSize)
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, lastSplitSize + SYNC_INTERVAL)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSelectedTwoAdjacentSplits() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .selectedSplit()
+ .split()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("A", DEFAULT_SPLIT_SIZE, DEFAULT_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSelectedThreeAdjacentSplits() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .selectedSplit()
+ .selectedSplit()
+ .split()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("A", DEFAULT_SPLIT_SIZE, DEFAULT_SPLIT_SIZE),
+ createMockSplit("A", DEFAULT_SPLIT_SIZE * 2, DEFAULT_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSelectedSplitsInTwoFiles() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .split()
+ .build(),
+ MockInputFile.builder()
+ .path("B")
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("B", 0, DEFAULT_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testOverlapWithPreviousFile() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .build(),
+ MockInputFile.builder()
+ .path("B")
+ .split()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("B", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testOverlapInSecondFile() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .build(),
+ MockInputFile.builder()
+ .path("B")
+ .split()
+ .selectedSplit()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("B", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL),
+ createMockSplit("B", DEFAULT_SPLIT_SIZE * 2, DEFAULT_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSmallSplitsLengthAdjustment() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .defaultSplitLength(SMALL_SPLIT_SIZE)
+ .split()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, SMALL_SPLIT_SIZE * 2)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testSmallSplitsOverlap() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .defaultSplitLength(SMALL_SPLIT_SIZE)
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, SMALL_SPLIT_SIZE),
+ createMockSplit("A", SMALL_SPLIT_SIZE * 2, SMALL_SPLIT_SIZE),
+ createMockSplit("A", SMALL_SPLIT_SIZE * 4, SMALL_SPLIT_SIZE)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test
+ public void testMaxSplitsSizePositive() throws Exception {
+
+ testCase = SplitFilterTestCase.builder()
+ .maxInputSize(DEFAULT_SPLIT_SIZE * 3 + SYNC_INTERVAL * 2)
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits(
+ createMockSplit("A", 0, DEFAULT_SPLIT_SIZE),
+ createMockSplit("A", DEFAULT_SPLIT_SIZE * 2 - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL),
+ createMockSplit("A", DEFAULT_SPLIT_SIZE * 4 - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL)
+ )
+ .build();
+
+ testCase.executeAndValidate();
+ }
+
+ @Test(expected = IOException.class)
+ public void testMaxSplitsSizeNegative() throws Exception {
+ testCase = SplitFilterTestCase.builder()
+ .maxInputSize(DEFAULT_SPLIT_SIZE * 3)
+ .inputFiles(
+ MockInputFile.builder()
+ .path("A")
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .split()
+ .selectedSplit()
+ .build()
+ )
+ .expectedSplits()
+ .build();
+
+ testCase.executeAndValidate();
+ }
+}