You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/01/26 22:21:46 UTC
svn commit: r1654899 [1/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/common/
common/src/test/org/apache/hadoop/hive/common/
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/
itests/hive-unit/src/test/java/org/apache/hadoop/...
Author: gates
Date: Mon Jan 26 21:21:45 2015
New Revision: 1654899
URL: http://svn.apache.org/r1654899
Log:
HIVE-8966 Delta files created by hive hcatalog streaming cannot be compacted (Jihong Liu and Alan Gates, reviewed by Owen O'Malley)
Added:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java?rev=1654899&view=auto
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java (added)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java Mon Jan 26 21:21:45 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.Arrays;
+
+/**
+ * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
+ * This class will view a transaction as valid only if it is committed. Both open and aborted
+ * transactions will be seen as invalid.
+ */
+public class ValidReadTxnList implements ValidTxnList {
+
+ protected long[] exceptions;
+ protected long highWatermark;
+
+ public ValidReadTxnList() {
+ this(new long[0], Long.MAX_VALUE);
+ }
+
+ public ValidReadTxnList(long[] exceptions, long highWatermark) {
+ if (exceptions.length == 0) {
+ this.exceptions = exceptions;
+ } else {
+ this.exceptions = exceptions.clone();
+ Arrays.sort(this.exceptions);
+ }
+ this.highWatermark = highWatermark;
+ }
+
+ public ValidReadTxnList(String value) {
+ readFromString(value);
+ }
+
+ @Override
+ public boolean isTxnValid(long txnid) {
+ if (highWatermark < txnid) {
+ return false;
+ }
+ return Arrays.binarySearch(exceptions, txnid) < 0;
+ }
+
+ @Override
+ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+ // check the easy cases first
+ if (highWatermark < minTxnId) {
+ return RangeResponse.NONE;
+ } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) {
+ return RangeResponse.ALL;
+ }
+
+ // since the exceptions and the range in question overlap, count the
+ // exceptions in the range
+ long count = Math.max(0, maxTxnId - highWatermark);
+ for(long txn: exceptions) {
+ if (minTxnId <= txn && txn <= maxTxnId) {
+ count += 1;
+ }
+ }
+
+ if (count == 0) {
+ return RangeResponse.ALL;
+ } else if (count == (maxTxnId - minTxnId + 1)) {
+ return RangeResponse.NONE;
+ } else {
+ return RangeResponse.SOME;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return writeToString();
+ }
+
+ @Override
+ public String writeToString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(highWatermark);
+ if (exceptions.length == 0) {
+ buf.append(':');
+ } else {
+ for(long except: exceptions) {
+ buf.append(':');
+ buf.append(except);
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void readFromString(String src) {
+ if (src == null) {
+ highWatermark = Long.MAX_VALUE;
+ exceptions = new long[0];
+ } else {
+ String[] values = src.split(":");
+ highWatermark = Long.parseLong(values[0]);
+ exceptions = new long[values.length - 1];
+ for(int i = 1; i < values.length; ++i) {
+ exceptions[i-1] = Long.parseLong(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public long getHighWatermark() {
+ return highWatermark;
+ }
+
+ @Override
+ public long[] getInvalidTransactions() {
+ return exceptions;
+ }
+}
+
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java Mon Jan 26 21:21:45 2015
@@ -38,21 +38,23 @@ public interface ValidTxnList {
public enum RangeResponse {NONE, SOME, ALL};
/**
- * Indicates whether a given transaction has been committed and should be
- * viewed as valid for read.
+ * Indicates whether a given transaction is valid. Note that valid may have different meanings
+ * for different implementations, as some will only want to see committed transactions and some
+ * both committed and aborted.
* @param txnid id for the transaction
- * @return true if committed, false otherwise
+ * @return true if valid, false otherwise
*/
- public boolean isTxnCommitted(long txnid);
+ public boolean isTxnValid(long txnid);
/**
- * Find out if a range of transaction ids have been committed.
+ * Find out if a range of transaction ids are valid. Note that valid may have different meanings
+ * for different implementations, as some will only want to see committed transactions and some
+ * both committed and aborted.
* @param minTxnId minimum txnid to look for, inclusive
* @param maxTxnId maximum txnid to look for, inclusive
- * @return Indicate whether none, some, or all of these transactions have been
- * committed.
+ * @return Indicate whether none, some, or all of these transactions are valid.
*/
- public RangeResponse isTxnRangeCommitted(long minTxnId, long maxTxnId);
+ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId);
/**
* Write this validTxnList into a string. This should produce a string that
@@ -74,9 +76,10 @@ public interface ValidTxnList {
public long getHighWatermark();
/**
- * Get the list of transactions under the high water mark that are still
- * open.
- * @return a list of open transaction ids
+ * Get the list of transactions under the high water mark that are not valid. Note that invalid
+ * may have different meanings for different implementations, as some will only want to see open
+ * transactions and some both open and aborted.
+ * @return a list of invalid transaction ids
*/
- public long[] getOpenTransactions();
+ public long[] getInvalidTransactions();
}
Added: hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java?rev=1654899&view=auto
==============================================================================
--- hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java (added)
+++ hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java Mon Jan 26 21:21:45 2015
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+/**
+ * Tests for {@link ValidReadTxnList}
+ */
+public class TestValidReadTxnList {
+
+ @Test
+ public void noExceptions() throws Exception {
+ ValidTxnList txnList = new ValidReadTxnList(new long[0], 1);
+ String str = txnList.writeToString();
+ Assert.assertEquals("1:", str);
+ ValidTxnList newList = new ValidReadTxnList();
+ newList.readFromString(str);
+ Assert.assertTrue(newList.isTxnValid(1));
+ Assert.assertFalse(newList.isTxnValid(2));
+ }
+
+ @Test
+ public void exceptions() throws Exception {
+ ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5);
+ String str = txnList.writeToString();
+ Assert.assertEquals("5:2:4", str);
+ ValidTxnList newList = new ValidReadTxnList();
+ newList.readFromString(str);
+ Assert.assertTrue(newList.isTxnValid(1));
+ Assert.assertFalse(newList.isTxnValid(2));
+ Assert.assertTrue(newList.isTxnValid(3));
+ Assert.assertFalse(newList.isTxnValid(4));
+ Assert.assertTrue(newList.isTxnValid(5));
+ Assert.assertFalse(newList.isTxnValid(6));
+ }
+
+ @Test
+ public void longEnoughToCompress() throws Exception {
+ long[] exceptions = new long[1000];
+ for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+ String str = txnList.writeToString();
+ ValidTxnList newList = new ValidReadTxnList();
+ newList.readFromString(str);
+ for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnValid(i));
+ for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnValid(i));
+ for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnValid(i));
+ Assert.assertFalse(newList.isTxnValid(2001));
+ }
+
+ @Test
+ public void readWriteConfig() throws Exception {
+ long[] exceptions = new long[1000];
+ for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+ String str = txnList.writeToString();
+ Configuration conf = new Configuration();
+ conf.set(ValidTxnList.VALID_TXNS_KEY, str);
+ File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile));
+ conf.write(out);
+ out.close();
+ DataInputStream in = new DataInputStream(new FileInputStream(tmpFile));
+ Configuration newConf = new Configuration();
+ newConf.readFields(in);
+ Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY));
+ }
+}
Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java (original)
+++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java Mon Jan 26 21:21:45 2015
@@ -107,7 +107,7 @@ public class StreamingIntegrationTester
.hasArgs()
.withArgName("partition-values")
.withDescription("partition values, must be provided in order of partition columns, " +
- "if not provided table is assumed to not be partitioned")
+ "if not provided table is assumed to not be partitioned")
.withLongOpt("partition")
.withValueSeparator(',')
.create('p'));
@@ -264,7 +264,7 @@ public class StreamingIntegrationTester
this.batches = batches;
this.writerNumber = writerNumber;
this.recordsPerTxn = recordsPerTxn;
- this.frequency = frequency;
+ this.frequency = frequency * 1000;
this.abortPct = abortPct;
this.cols = cols;
this.types = types;
@@ -279,8 +279,8 @@ public class StreamingIntegrationTester
conn = endPoint.newConnection(true);
RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint);
- long start = System.currentTimeMillis();
for (int i = 0; i < batches; i++) {
+ long start = System.currentTimeMillis();
LOG.info("Starting batch " + i);
TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer);
try {
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java Mon Jan 26 21:21:45 2015
@@ -19,12 +19,8 @@ package org.apache.hadoop.hive.metastore
import junit.framework.Assert;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
@@ -69,10 +65,10 @@ public class TestHiveMetaStoreTxns {
client.rollbackTxn(1);
client.commitTxn(2);
ValidTxnList validTxns = client.getValidTxns();
- Assert.assertFalse(validTxns.isTxnCommitted(1));
- Assert.assertTrue(validTxns.isTxnCommitted(2));
- Assert.assertFalse(validTxns.isTxnCommitted(3));
- Assert.assertFalse(validTxns.isTxnCommitted(4));
+ Assert.assertFalse(validTxns.isTxnValid(1));
+ Assert.assertTrue(validTxns.isTxnValid(2));
+ Assert.assertFalse(validTxns.isTxnValid(3));
+ Assert.assertFalse(validTxns.isTxnValid(4));
}
@Test
@@ -84,17 +80,17 @@ public class TestHiveMetaStoreTxns {
client.rollbackTxn(1);
client.commitTxn(2);
ValidTxnList validTxns = client.getValidTxns(3);
- Assert.assertFalse(validTxns.isTxnCommitted(1));
- Assert.assertTrue(validTxns.isTxnCommitted(2));
- Assert.assertTrue(validTxns.isTxnCommitted(3));
- Assert.assertFalse(validTxns.isTxnCommitted(4));
+ Assert.assertFalse(validTxns.isTxnValid(1));
+ Assert.assertTrue(validTxns.isTxnValid(2));
+ Assert.assertTrue(validTxns.isTxnValid(3));
+ Assert.assertFalse(validTxns.isTxnValid(4));
}
@Test
public void testTxnRange() throws Exception {
ValidTxnList validTxns = client.getValidTxns();
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
- validTxns.isTxnRangeCommitted(1L, 3L));
+ validTxns.isTxnRangeValid(1L, 3L));
List<Long> tids = client.openTxns("me", 5).getTxn_ids();
HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5);
@@ -108,43 +104,43 @@ public class TestHiveMetaStoreTxns {
validTxns = client.getValidTxns();
System.out.println("validTxns = " + validTxns);
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(2L, 2L));
+ validTxns.isTxnRangeValid(2L, 2L));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(2L, 3L));
+ validTxns.isTxnRangeValid(2L, 3L));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(2L, 4L));
+ validTxns.isTxnRangeValid(2L, 4L));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(3L, 4L));
+ validTxns.isTxnRangeValid(3L, 4L));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(1L, 4L));
+ validTxns.isTxnRangeValid(1L, 4L));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(2L, 5L));
+ validTxns.isTxnRangeValid(2L, 5L));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(1L, 2L));
+ validTxns.isTxnRangeValid(1L, 2L));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(4L, 5L));
+ validTxns.isTxnRangeValid(4L, 5L));
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
- validTxns.isTxnRangeCommitted(1L, 1L));
+ validTxns.isTxnRangeValid(1L, 1L));
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
- validTxns.isTxnRangeCommitted(5L, 10L));
+ validTxns.isTxnRangeValid(5L, 10L));
- validTxns = new ValidTxnListImpl("10:4:5:6");
+ validTxns = new ValidReadTxnList("10:4:5:6");
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
- validTxns.isTxnRangeCommitted(4,6));
+ validTxns.isTxnRangeValid(4,6));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(7, 10));
+ validTxns.isTxnRangeValid(7, 10));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(7, 11));
+ validTxns.isTxnRangeValid(7, 11));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(3, 6));
+ validTxns.isTxnRangeValid(3, 6));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(4, 7));
+ validTxns.isTxnRangeValid(4, 7));
Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
- validTxns.isTxnRangeCommitted(1, 12));
+ validTxns.isTxnRangeValid(1, 12));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
- validTxns.isTxnRangeCommitted(1, 3));
+ validTxns.isTxnRangeValid(1, 3));
}
@Test
@@ -219,32 +215,32 @@ public class TestHiveMetaStoreTxns {
@Test
public void stringifyValidTxns() throws Exception {
// Test with just high water mark
- ValidTxnList validTxns = new ValidTxnListImpl("1:");
+ ValidTxnList validTxns = new ValidReadTxnList("1:");
String asString = validTxns.toString();
Assert.assertEquals("1:", asString);
- validTxns = new ValidTxnListImpl(asString);
+ validTxns = new ValidReadTxnList(asString);
Assert.assertEquals(1, validTxns.getHighWatermark());
- Assert.assertNotNull(validTxns.getOpenTransactions());
- Assert.assertEquals(0, validTxns.getOpenTransactions().length);
+ Assert.assertNotNull(validTxns.getInvalidTransactions());
+ Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
asString = validTxns.toString();
Assert.assertEquals("1:", asString);
- validTxns = new ValidTxnListImpl(asString);
+ validTxns = new ValidReadTxnList(asString);
Assert.assertEquals(1, validTxns.getHighWatermark());
- Assert.assertNotNull(validTxns.getOpenTransactions());
- Assert.assertEquals(0, validTxns.getOpenTransactions().length);
+ Assert.assertNotNull(validTxns.getInvalidTransactions());
+ Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
// Test with open transactions
- validTxns = new ValidTxnListImpl("10:5:3");
+ validTxns = new ValidReadTxnList("10:5:3");
asString = validTxns.toString();
if (!asString.equals("10:3:5") && !asString.equals("10:5:3")) {
Assert.fail("Unexpected string value " + asString);
}
- validTxns = new ValidTxnListImpl(asString);
+ validTxns = new ValidReadTxnList(asString);
Assert.assertEquals(10, validTxns.getHighWatermark());
- Assert.assertNotNull(validTxns.getOpenTransactions());
- Assert.assertEquals(2, validTxns.getOpenTransactions().length);
+ Assert.assertNotNull(validTxns.getInvalidTransactions());
+ Assert.assertEquals(2, validTxns.getInvalidTransactions().length);
boolean sawThree = false, sawFive = false;
- for (long tid : validTxns.getOpenTransactions()) {
+ for (long tid : validTxns.getInvalidTransactions()) {
if (tid == 3) sawThree = true;
else if (tid == 5) sawFive = true;
else Assert.fail("Unexpected value " + tid);
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Mon Jan 26 21:21:45 2015
@@ -1,8 +1,12 @@
package org.apache.hadoop.hive.ql.txn.compactor;
-import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -15,20 +19,28 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -184,7 +196,7 @@ public class TestCompactor {
Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs());
StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats();
Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen());
- Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen());
+ Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen(), 0.01);
Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs());
@@ -268,6 +280,299 @@ public class TestCompactor {
colBStatsPart2, colStats.get(1).getStatsData().getStringStats());
}
+ @Test
+ public void minorCompactWhileStreaming() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, but don't close it.
+ writeBatch(connection, writer, true);
+
+ // Now, compact
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] names = new String[stat.length];
+ Path resultFile = null;
+ for (int i = 0; i < names.length; i++) {
+ names[i] = stat[i].getPath().getName();
+ if (names[i].equals("delta_0000001_0000004")) {
+ resultFile = stat[i].getPath();
+ }
+ }
+ Arrays.sort(names);
+ Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"});
+ checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void majorCompactWhileStreaming() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, but don't close it.
+ writeBatch(connection, writer, true);
+
+ // Now, compact
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
+ Assert.assertEquals(1, stat.length);
+ String name = stat[0].getPath().getName();
+ Assert.assertEquals(name, "base_0000004");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void minorCompactAfterAbort() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, abort everything, don't properly close it
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+
+ // Now, compact
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] names = new String[stat.length];
+ Path resultDelta = null;
+ for (int i = 0; i < names.length; i++) {
+ names[i] = stat[i].getPath().getName();
+ if (names[i].equals("delta_0000001_0000006")) {
+ resultDelta = stat[i].getPath();
+ }
+ }
+ Arrays.sort(names);
+ Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"});
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void majorCompactAfterAbort() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, but don't close it.
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+ txnBatch.beginNextTransaction();
+ txnBatch.abort();
+
+
+ // Now, compact
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
+ Assert.assertEquals(1, stat.length);
+ String name = stat[0].getPath().getName();
+ Assert.assertEquals(name, "base_0000006");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ } finally {
+ connection.close();
+ }
+ }
+ private void writeBatch(StreamingConnection connection, DelimitedInputWriter writer,
+ boolean closeEarly)
+ throws InterruptedException, StreamingException {
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("50,Kiev".getBytes());
+ txnBatch.write("51,St. Petersburg".getBytes());
+ txnBatch.write("44,Boston".getBytes());
+ txnBatch.commit();
+
+ if (!closeEarly) {
+ txnBatch.beginNextTransaction();
+ txnBatch.write("52,Tel Aviv".getBytes());
+ txnBatch.write("53,Atlantis".getBytes());
+ txnBatch.write("53,Boston".getBytes());
+ txnBatch.commit();
+
+ txnBatch.close();
+ }
+ }
+
+ private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max)
+ throws IOException {
+ ValidTxnList txnList = new ValidTxnList() {
+ @Override
+ public boolean isTxnValid(long txnid) {
+ return true;
+ }
+
+ @Override
+ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+ return RangeResponse.ALL;
+ }
+
+ @Override
+ public String writeToString() {
+ return "";
+ }
+
+ @Override
+ public void readFromString(String src) {
+
+ }
+
+ @Override
+ public long getHighWatermark() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long[] getInvalidTransactions() {
+ return new long[0];
+ }
+ };
+
+ OrcInputFormat aif = new OrcInputFormat();
+
+ AcidInputFormat.RawReader<OrcStruct> reader =
+ aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas);
+ RecordIdentifier identifier = reader.createKey();
+ OrcStruct value = reader.createValue();
+ long currentTxn = min;
+ boolean seenCurrentTxn = false;
+ while (reader.next(identifier, value)) {
+ if (!seenCurrentTxn) {
+ Assert.assertEquals(currentTxn, identifier.getTransactionId());
+ seenCurrentTxn = true;
+ }
+ if (currentTxn != identifier.getTransactionId()) {
+ Assert.assertEquals(currentTxn + 1, identifier.getTransactionId());
+ currentTxn++;
+ }
+ }
+ Assert.assertEquals(max, currentTxn);
+ }
+
/**
* convenience method to execute a select stmt and dump results to log file
*/
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java Mon Jan 26 21:21:45 2015
@@ -1747,12 +1747,12 @@ public class HiveMetaStoreClient impleme
@Override
public ValidTxnList getValidTxns() throws TException {
- return TxnHandler.createValidTxnList(client.get_open_txns(), 0);
+ return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
}
@Override
public ValidTxnList getValidTxns(long currentTxn) throws TException {
- return TxnHandler.createValidTxnList(client.get_open_txns(), currentTxn);
+ return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
}
@Override
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Mon Jan 26 21:21:45 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.util.StringUtils;
@@ -576,6 +577,25 @@ public class CompactionTxnHandler extend
return findColumnsWithStats(ci);
}
}
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions as invalid.
+ * @param txns txn list from the metastore
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ long highWater = txns.getTxn_high_water_mark();
+ long minOpenTxn = Long.MAX_VALUE;
+ long[] exceptions = new long[txns.getOpen_txnsSize()];
+ int i = 0;
+ for (TxnInfo txn : txns.getOpen_txns()) {
+ if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
+ exceptions[i++] = txn.getId();
+ }
+ return new ValidCompactorTxnList(exceptions, minOpenTxn, highWater);
+ }
}
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Mon Jan 26 21:21:45 2015
@@ -29,7 +29,7 @@ import org.apache.commons.dbcp.PoolingDa
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -249,14 +249,15 @@ public class TxnHandler {
/**
* Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}.
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
* @param txns txn list from the metastore
* @param currentTxn Current transaction that the user has open. If this is greater than 0 it
* will be removed from the exceptions list so that the user sees his own
* transaction as valid.
* @return a valid txn list.
*/
- public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
long highWater = txns.getTxn_high_water_mark();
Set<Long> open = txns.getOpen_txns();
long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
@@ -265,7 +266,7 @@ public class TxnHandler {
if (currentTxn > 0 && currentTxn == txn) continue;
exceptions[i++] = txn;
}
- return new ValidTxnListImpl(exceptions, highWater);
+ return new ValidReadTxnList(exceptions, highWater);
}
public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java?rev=1654899&view=auto
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java (added)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java Mon Jan 26 21:21:45 2015
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+
+import java.util.Arrays;
+
+/**
+ * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
+ * is committed or aborted. Additionally it will return none if there are any open transactions
+ * below the max transaction given, since we don't want to compact above open transactions. For
+ * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These
+ * produce the logic we need to assure that the compactor only sees records less than the lowest
+ * open transaction when choosing which files to compact, but that it still ignores aborted
+ * records when compacting.
+ */
+public class ValidCompactorTxnList extends ValidReadTxnList {
+
+ // The minimum open transaction id
+ private long minOpenTxn;
+
+ public ValidCompactorTxnList() {
+ super();
+ minOpenTxn = -1;
+ }
+
+ /**
+ *
+ * @param exceptions list of all open and aborted transactions
+ * @param minOpen lowest open transaction
+ * @param highWatermark highest committed transaction
+ */
+ public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) {
+ super(exceptions, highWatermark);
+ minOpenTxn = minOpen;
+ }
+
+ public ValidCompactorTxnList(String value) {
+ super(value);
+ }
+
+ @Override
+ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+ if (highWatermark < minTxnId) {
+ return RangeResponse.NONE;
+ } else if (minOpenTxn < 0) {
+ return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+ } else {
+ return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+ }
+ }
+
+ @Override
+ public String writeToString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(highWatermark);
+ buf.append(':');
+ buf.append(minOpenTxn);
+ if (exceptions.length == 0) {
+ buf.append(':');
+ } else {
+ for(long except: exceptions) {
+ buf.append(':');
+ buf.append(except);
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void readFromString(String src) {
+ if (src == null) {
+ highWatermark = Long.MAX_VALUE;
+ exceptions = new long[0];
+ } else {
+ String[] values = src.split(":");
+ highWatermark = Long.parseLong(values[0]);
+ minOpenTxn = Long.parseLong(values[1]);
+ exceptions = new long[values.length - 2];
+ for(int i = 2; i < values.length; ++i) {
+ exceptions[i-2] = Long.parseLong(values[i]);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ long getMinOpenTxn() {
+ return minOpenTxn;
+ }
+}
Added: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java?rev=1654899&view=auto
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java (added)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java Mon Jan 26 21:21:45 2015
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestValidCompactorTxnList {
+
+ @Test
+ public void minTxnHigh() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+ }
+
+ @Test
+ public void maxTxnLow() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+ }
+
+ @Test
+ public void minTxnHighNoExceptions() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+ }
+
+ @Test
+ public void maxTxnLowNoExceptions() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+ }
+
+ @Test
+ public void exceptionsAllBelow() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+ }
+
+ @Test
+ public void exceptionsInMidst() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15);
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+ }
+
+ @Test
+ public void writeToString() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37);
+ Assert.assertEquals("37:9:7:9:10", txns.writeToString());
+ txns = new ValidCompactorTxnList();
+ Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", txns.writeToString());
+ txns = new ValidCompactorTxnList(new long[0], -1, 23);
+ Assert.assertEquals("23:-1:", txns.writeToString());
+ }
+
+ @Test
+ public void readFromString() {
+ ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10");
+ Assert.assertEquals(37L, txns.getHighWatermark());
+ Assert.assertEquals(9L, txns.getMinOpenTxn());
+ Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions());
+ txns = new ValidCompactorTxnList("21:-1:");
+ Assert.assertEquals(21L, txns.getHighWatermark());
+ Assert.assertEquals(-1L, txns.getMinOpenTxn());
+ Assert.assertEquals(0, txns.getInvalidTransactions().length);
+
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Jan 26 21:21:45 2015
@@ -43,7 +43,14 @@ public class AcidUtils {
// This key will be put in the conf file when planning an acid operation
public static final String CONF_ACID_KEY = "hive.doing.acid";
public static final String BASE_PREFIX = "base_";
+ public static final PathFilter baseFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(BASE_PREFIX);
+ }
+ };
public static final String DELTA_PREFIX = "delta_";
+ public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
public static final PathFilter deltaFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -54,7 +61,8 @@ public class AcidUtils {
public static final PathFilter bucketFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
- return path.getName().startsWith(BUCKET_PREFIX);
+ return path.getName().startsWith(BUCKET_PREFIX) &&
+ !path.getName().endsWith(DELTA_SIDE_FILE_SUFFIX);
}
};
public static final String BUCKET_DIGITS = "%05d";
@@ -369,7 +377,7 @@ public class AcidUtils {
}
} else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
ParsedDelta delta = parseDelta(child);
- if (txnList.isTxnRangeCommitted(delta.minTransaction,
+ if (txnList.isTxnRangeValid(delta.minTransaction,
delta.maxTransaction) !=
ValidTxnList.RangeResponse.NONE) {
working.add(delta);
@@ -402,7 +410,7 @@ public class AcidUtils {
for(ParsedDelta next: working) {
if (next.maxTransaction > current) {
// are any of the new transactions ones that we care about?
- if (txnList.isTxnRangeCommitted(current+1, next.maxTransaction) !=
+ if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
ValidTxnList.RangeResponse.NONE) {
deltas.add(next);
current = next.maxTransaction;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Jan 26 21:21:45 2015
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -408,7 +408,7 @@ public class OrcInputFormat implements
}
String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
Long.MAX_VALUE + ":");
- transactionList = new ValidTxnListImpl(value);
+ transactionList = new ValidReadTxnList(value);
}
int getSchedulers() {
@@ -1132,7 +1132,7 @@ public class OrcInputFormat implements
}
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
Long.MAX_VALUE + ":");
- ValidTxnList validTxnList = new ValidTxnListImpl(txnString);
+ ValidTxnList validTxnList = new ValidReadTxnList(txnString);
final OrcRawRecordMerger records =
new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
validTxnList, readOptions, deltas);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Mon Jan 26 21:21:45 2015
@@ -575,7 +575,7 @@ public class OrcRawRecordMerger implemen
}
// if this transaction isn't ok, skip over it
- if (!validTxnList.isTxnCommitted(
+ if (!validTxnList.isTxnValid(
((ReaderKey) recordIdentifier).getCurrentTransactionId())) {
continue;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java Mon Jan 26 21:21:45 2015
@@ -129,7 +129,7 @@ public class OrcRecordUpdater implements
}
static Path getSideFile(Path main) {
- return new Path(main + "_flush_length");
+ return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
}
static int getOperation(OrcStruct struct) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Mon Jan 26 21:21:45 2015
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.Context;
@@ -199,7 +199,7 @@ class DummyTxnManager extends HiveTxnMan
@Override
public ValidTxnList getValidTxns() throws LockException {
- return new ValidTxnListImpl();
+ return new ValidReadTxnList();
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Mon Jan 26 21:21:45 2015
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -183,7 +183,7 @@ public class Cleaner extends CompactorTh
// Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
// transactions. This assures that all deltas are treated as valid and all we return are
// obsolete files.
- final ValidTxnList txnList = new ValidTxnListImpl();
+ final ValidTxnList txnList = new ValidReadTxnList();
if (runJobAsSelf(ci.runAs)) {
removeFiles(location, txnList);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Mon Jan 26 21:21:45 2015
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -159,7 +159,7 @@ public class CompactorMR {
if (parsedDeltas == null || parsedDeltas.size() == 0) {
// Seriously, no deltas? Can't compact that.
- LOG.error("No delta files found to compact in " + sd.getLocation());
+ LOG.error( "No delta files found to compact in " + sd.getLocation());
return;
}
@@ -505,7 +505,7 @@ public class CompactorMR {
AcidInputFormat<WritableComparable, V> aif =
instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
ValidTxnList txnList =
- new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Mon Jan 26 21:21:45 2015
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -77,7 +78,8 @@ public class Initiator extends Compactor
// don't doom the entire thread.
try {
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
+ ValidTxnList txns =
+ CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Mon Jan 26 21:21:45 2015
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -121,7 +122,8 @@ public class Worker extends CompactorThr
final boolean isMajor = ci.isMajorCompaction();
final ValidTxnList txns =
- TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
+ CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ LOG.debug("ValidCompactTxnList: " + txns.writeToString());
final StringBuffer jobName = new StringBuffer(name);
jobName.append("-compactor-");
jobName.append(ci.getFullPartitionName());
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java Mon Jan 26 21:21:45 2015
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -114,7 +113,7 @@ public class TestFileSinkOperator {
"testFileSinkOperator");
tmpdir.mkdir();
tmpdir.deleteOnExit();
- txnList = new ValidTxnListImpl(new long[]{}, 2);
+ txnList = new ValidReadTxnList(new long[]{}, 2);
}
@Test
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Mon Jan 26 21:21:45 2015
@@ -20,7 +20,8 @@ package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
@@ -91,7 +92,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
- new ValidTxnListImpl("100:"));
+ new ValidReadTxnList("100:"));
assertEquals(null, dir.getBaseDirectory());
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
@@ -121,7 +122,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:"));
+ "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
assertEquals(null, dir.getBaseDirectory());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -162,7 +163,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:"));
+ "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(5, obsolete.size());
@@ -191,7 +192,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
List<FileStatus> obsoletes = dir.getObsolete();
assertEquals(4, obsoletes.size());
@@ -202,7 +203,7 @@ public class TestAcidUtils {
assertEquals(0, dir.getOriginalFiles().size());
assertEquals(0, dir.getCurrentDirectories().size());
// we should always get the latest base
- dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("10:"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
}
@@ -216,7 +217,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]));
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
// The two original buckets won't be in the obsolete list because we don't look at those
// until we have determined there is no base.
List<FileStatus> obsolete = dir.getObsolete();
@@ -239,7 +240,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -252,4 +253,51 @@ public class TestAcidUtils {
assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(2).getPath().toString());
assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
}
+
+ @Test
+ public void deltasWithOpenTxnInRead() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(2, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
+ }
+
+ @Test
+ public void deltasWithOpenTxnsNotInCompact() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(1, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ }
+
+ @Test
+ public void deltasWithOpenTxnsNotInCompact2() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500,
+ new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(1, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ }
+
+
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Mon Jan 26 21:21:45 2015
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -301,7 +301,7 @@ public class TestOrcRawRecordMerger {
}
private static ValidTxnList createMaximalTxnList() {
- return new ValidTxnListImpl(Long.MAX_VALUE + ":");
+ return new ValidReadTxnList(Long.MAX_VALUE + ":");
}
@Test
@@ -492,7 +492,7 @@ public class TestOrcRawRecordMerger {
.maximumTransactionId(100);
of.getRecordUpdater(root, options).close(false);
- ValidTxnList txnList = new ValidTxnListImpl("200:");
+ ValidTxnList txnList = new ValidReadTxnList("200:");
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -550,7 +550,7 @@ public class TestOrcRawRecordMerger {
ru.delete(200, new MyRow("", 8, 0, BUCKET));
ru.close(false);
- ValidTxnList txnList = new ValidTxnListImpl("200:");
+ ValidTxnList txnList = new ValidReadTxnList("200:");
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
@@ -734,7 +734,7 @@ public class TestOrcRawRecordMerger {
merger.close();
// try ignoring the 200 transaction and make sure it works still
- ValidTxnList txns = new ValidTxnListImpl("2000:200");
+ ValidTxnList txns = new ValidReadTxnList("2000:200");
merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
txns, new Reader.Options(),
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Mon Jan 26 21:21:45 2015
@@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -156,6 +157,11 @@ public abstract class CompactorTest {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
}
+ protected void addLengthFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
+ throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.LENGTH_FILE, 2, true);
+ }
+
protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception {
addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
}
@@ -185,9 +191,21 @@ public abstract class CompactorTest {
return paths;
}
- protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnException, TxnAbortedException {
+ protected void burnThroughTransactions(int num)
+ throws MetaException, NoSuchTxnException, TxnAbortedException {
+ burnThroughTransactions(num, null, null);
+ }
+
+ protected void burnThroughTransactions(int num, Set<Long> open, Set<Long> aborted)
+ throws MetaException, NoSuchTxnException, TxnAbortedException {
OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
- for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
+ for (long tid : rsp.getTxn_ids()) {
+ if (aborted != null && aborted.contains(tid)) {
+ txnHandler.abortTxn(new AbortTxnRequest(tid));
+ } else if (open == null || (open != null && !open.contains(tid))) {
+ txnHandler.commitTxn(new CommitTxnRequest(tid));
+ }
+ }
}
protected void stopThread() {
@@ -249,7 +267,7 @@ public abstract class CompactorTest {
return location;
}
- private enum FileType {BASE, DELTA, LEGACY};
+ private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE};
private void addFile(Table t, Partition p, long minTxn, long maxTxn,
int numRecords, FileType type, int numBuckets,
@@ -259,6 +277,7 @@ public abstract class CompactorTest {
String filename = null;
switch (type) {
case BASE: filename = "base_" + maxTxn; break;
+ case LENGTH_FILE: // Fall through to delta
case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
case LEGACY: break; // handled below
}
@@ -273,12 +292,19 @@ public abstract class CompactorTest {
Path dir = new Path(location, filename);
fs.mkdirs(dir);
partFile = AcidUtils.createBucketFile(dir, bucket);
+ if (type == FileType.LENGTH_FILE) {
+ partFile = new Path(partFile.toString() + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
+ }
}
FSDataOutputStream out = fs.create(partFile);
- for (int i = 0; i < numRecords; i++) {
- RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
- ri.write(out);
- out.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ if (type == FileType.LENGTH_FILE) {
+ out.writeInt(numRecords);
+ } else {
+ for (int i = 0; i < numRecords; i++) {
+ RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
+ ri.write(out);
+ out.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
}
out.close();
}