You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/08/04 21:14:32 UTC
[11/17] hive git commit: HIVE-17234 Remove HBase metastore from
master (Alan Gates, reviewed by Daniel Dai and Sergey Shelukhin)
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
new file mode 100644
index 0000000..2542a00
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DateColumnStatsMerger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats();
+ DateColumnStatsData newData = newColStats.getStatsData().getDateStats();
+ Date lowValue = aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+ .getLowValue() : newData.getLowValue();
+ aggregateData.setLowValue(lowValue);
+ Date highValue = aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
+ .getHighValue() : newData.getHighValue();
+ aggregateData.setHighValue(highValue);
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+ || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+ NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(newData.getBitVectors());
+ long ndv = -1;
+ if (oldEst.canMerge(newEst)) {
+ oldEst.mergeEstimators(newEst);
+ ndv = oldEst.estimateNumDistinctValues();
+ aggregateData.setBitVectors(oldEst.serialize());
+ } else {
+ ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+ }
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
new file mode 100644
index 0000000..4e8e129
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DecimalColumnStatsMerger.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+
+public class DecimalColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats();
+ DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats();
+ Decimal lowValue = aggregateData.getLowValue() != null
+ && (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData
+ .getLowValue() : newData.getLowValue();
+ aggregateData.setLowValue(lowValue);
+ Decimal highValue = aggregateData.getHighValue() != null
+ && (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData
+ .getHighValue() : newData.getHighValue();
+ aggregateData.setHighValue(highValue);
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+ || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+ NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(newData.getBitVectors());
+ long ndv = -1;
+ if (oldEst.canMerge(newEst)) {
+ oldEst.mergeEstimators(newEst);
+ ndv = oldEst.estimateNumDistinctValues();
+ aggregateData.setBitVectors(oldEst.serialize());
+ } else {
+ ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+ }
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
new file mode 100644
index 0000000..4ef5c39
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/DoubleColumnStatsMerger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+
+public class DoubleColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats();
+ DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats();
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+ || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+ NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(newData.getBitVectors());
+ long ndv = -1;
+ if (oldEst.canMerge(newEst)) {
+ oldEst.mergeEstimators(newEst);
+ ndv = oldEst.estimateNumDistinctValues();
+ aggregateData.setBitVectors(oldEst.serialize());
+ } else {
+ ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+ }
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
new file mode 100644
index 0000000..acf7f03
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/LongColumnStatsMerger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+
+public class LongColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats();
+ LongColumnStatsData newData = newColStats.getStatsData().getLongStats();
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+ || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+ NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(newData.getBitVectors());
+ long ndv = -1;
+ if (oldEst.canMerge(newEst)) {
+ oldEst.mergeEstimators(newEst);
+ ndv = oldEst.estimateNumDistinctValues();
+ aggregateData.setBitVectors(oldEst.serialize());
+ } else {
+ ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+ }
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
new file mode 100644
index 0000000..b3cd33c
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/merge/StringColumnStatsMerger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.columnstats.merge;
+
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+
+public class StringColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats();
+ StringColumnStatsData newData = newColStats.getStatsData().getStringStats();
+ aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (!aggregateData.isSetBitVectors() || aggregateData.getBitVectors().length() == 0
+ || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ NumDistinctValueEstimator oldEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(aggregateData.getBitVectors());
+ NumDistinctValueEstimator newEst = NumDistinctValueEstimatorFactory
+ .getNumDistinctValueEstimator(newData.getBitVectors());
+ long ndv = -1;
+ if (oldEst.canMerge(newEst)) {
+ oldEst.mergeEstimators(newEst);
+ ndv = oldEst.estimateNumDistinctValues();
+ aggregateData.setBitVectors(oldEst.serialize());
+ } else {
+ ndv = Math.max(aggregateData.getNumDVs(), newData.getNumDVs());
+ }
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
deleted file mode 100644
index 2db5c38..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.hbase;
-
-import com.google.common.primitives.Longs;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hive.common.util.BloomFilter;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Filter for scanning aggregates stats table
- */
-public class AggrStatsInvalidatorFilter extends FilterBase {
- private static final Logger LOG =
- LoggerFactory.getLogger(AggrStatsInvalidatorFilter.class.getName());
- private final List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> entries;
- private final long runEvery;
- private final long maxCacheEntryLife;
- // This class is not serializable, so I realize transient doesn't mean anything. It's just to
- // comunicate that we don't serialize this and ship it across to the filter on the other end.
- // We use the time the filter is actually instantiated in HBase.
- private transient long now;
-
- public static Filter parseFrom(byte[] serialized) throws DeserializationException {
- try {
- return new AggrStatsInvalidatorFilter(
- HbaseMetastoreProto.AggrStatsInvalidatorFilter.parseFrom(serialized));
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * @param proto Protocol buffer representation of this filter.
- */
- AggrStatsInvalidatorFilter(HbaseMetastoreProto.AggrStatsInvalidatorFilter proto) {
- this.entries = proto.getToInvalidateList();
- this.runEvery = proto.getRunEvery();
- this.maxCacheEntryLife = proto.getMaxCacheEntryLife();
- now = System.currentTimeMillis();
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return HbaseMetastoreProto.AggrStatsInvalidatorFilter.newBuilder()
- .addAllToInvalidate(entries)
- .setRunEvery(runEvery)
- .setMaxCacheEntryLife(maxCacheEntryLife)
- .build()
- .toByteArray();
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- return false;
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- // Is this the partition we want?
- if (Arrays.equals(CellUtil.cloneQualifier(cell), HBaseReadWrite.AGGR_STATS_BLOOM_COL)) {
- HbaseMetastoreProto.AggrStatsBloomFilter fromCol =
- HbaseMetastoreProto.AggrStatsBloomFilter.parseFrom(CellUtil.cloneValue(cell));
- BloomFilter bloom = null;
- if (now - maxCacheEntryLife > fromCol.getAggregatedAt()) {
- // It's too old, kill it regardless of whether we were asked to or not.
- return ReturnCode.INCLUDE;
- } else if (now - runEvery * 2 <= fromCol.getAggregatedAt()) {
- // It's too new. We might be stomping on something that was just created. Skip it.
- return ReturnCode.NEXT_ROW;
- } else {
- // Look through each of our entries and see if any of them match.
- for (HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry entry : entries) {
- // First check if we match on db and table match
- if (entry.getDbName().equals(fromCol.getDbName()) &&
- entry.getTableName().equals(fromCol.getTableName())) {
- if (bloom == null) {
- // Now, reconstitute the bloom filter and probe it with each of our partition names
- List<Long> bitsList = fromCol.getBloomFilter().getBitsList();
- bloom = new BloomFilter(Longs.toArray(bitsList), fromCol.getBloomFilter().getNumFuncs());
- }
- if (bloom.test(entry.getPartName().toByteArray())) {
- // This is most likely a match, so mark it and quit looking.
- return ReturnCode.INCLUDE;
- }
- }
- }
- }
- return ReturnCode.NEXT_ROW;
- } else {
- return ReturnCode.NEXT_COL;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
deleted file mode 100644
index 2359939..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.hbase;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A simple metric to count how many times something occurs.
- */
-class Counter {
- private final String name;
- private long cnt;
-
- Counter(String name) {
- this.name = name;
- cnt = 0;
- }
-
- void incr() {
- cnt++;
- }
-
- void clear() {
- cnt = 0;
- }
-
- String dump() {
- StringBuilder bldr = new StringBuilder("Dumping metric: ");
- bldr.append(name).append(' ').append(cnt);
- return bldr.toString();
- }
-
- @VisibleForTesting long getCnt() {
- return cnt;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java
deleted file mode 100644
index 696e588..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseConnection.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.hbase;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A connection to HBase. Separated out as an interface so we can slide different transaction
- * managers between our code and HBase.
- */
-public interface HBaseConnection extends Configurable {
-
- /**
- * Connects to HBase. This must be called after {@link #setConf} has been called.
- * @throws IOException
- */
- void connect() throws IOException;
-
- /**
- * Close the connection. No further operations are possible after this is done.
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * Begin a transaction.
- * @throws IOException
- */
- void beginTransaction() throws IOException;
-
- /**
- * Commit a transaction
- * @throws IOException indicates the commit has failed
- */
- void commitTransaction() throws IOException;
-
- /**
- * Rollback a transaction
- * @throws IOException
- */
- void rollbackTransaction() throws IOException;
-
- /**
- * Flush commits. A no-op for transaction implementations since they will write at commit time.
- * @param htab Table to flush
- * @throws IOException
- */
- void flush(HTableInterface htab) throws IOException;
-
- /**
- * Create a new table
- * @param tableName name of the table
- * @param columnFamilies name of the column families in the table
- * @throws IOException
- */
- void createHBaseTable(String tableName, List<byte[]> columnFamilies) throws IOException;
-
- /**
- * Fetch an existing HBase table.
- * @param tableName name of the table
- * @return table handle
- * @throws IOException
- */
- HTableInterface getHBaseTable(String tableName) throws IOException;
-
- /**
- * Fetch an existing HBase table and force a connection to it. This should be used only in
- * cases where you want to assure that the table exists (ie at install).
- * @param tableName name of the table
- * @param force if true, force a connection by fetching a non-existant key
- * @return table handle
- * @throws IOException
- */
- HTableInterface getHBaseTable(String tableName, boolean force) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
deleted file mode 100644
index 3c03846..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
+++ /dev/null
@@ -1,612 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.hbase;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-
-
-/**
- * Utility function for generating hbase partition filtering plan representation
- * from ExpressionTree.
- * Optimizations to be done -
- * - Case where all partition keys are specified. Should use a get
- *
- * {@link PartitionFilterGenerator} is a visitor on the given filter expression tree. After
- * walking it it produces the HBase execution plan represented by {@link FilterPlan}. See
- * their javadocs for more details.
- */
-class HBaseFilterPlanUtil {
-
- /**
- * Compare two byte arrays.
- *
- * @param ar1
- * first byte array
- * @param ar2
- * second byte array
- * @return -1 if ar1 < ar2, 0 if == , 1 if >
- */
- static int compare(byte[] ar1, byte[] ar2) {
- // null check is not needed, nulls are not passed here
- for (int i = 0; i < ar1.length; i++) {
- if (i == ar2.length) {
- return 1;
- } else {
- if (ar1[i] == ar2[i]) {
- continue;
- } else if (ar1[i] > ar2[i]) {
- return 1;
- } else {
- return -1;
- }
- }
- }
- // ar2 equal until length of ar1.
- if(ar1.length == ar2.length) {
- return 0;
- }
- // ar2 has more bytes
- return -1;
- }
-
- /**
- * Represents the execution plan for hbase to find the set of partitions that
- * match given filter expression.
- * If you have an AND or OR of two expressions, you can determine FilterPlan for each
- * children and then call lhs.and(rhs) or lhs.or(rhs) respectively
- * to generate a new plan for the expression.
- *
- * The execution plan has one or more ScanPlan objects. To get the results the set union of all
- * ScanPlan objects needs to be done.
- */
- public static abstract class FilterPlan {
- abstract FilterPlan and(FilterPlan other);
- abstract FilterPlan or(FilterPlan other);
- abstract List<ScanPlan> getPlans();
- @Override
- public String toString() {
- return getPlans().toString();
- }
-
- }
-
- /**
- * Represents a union/OR of single scan plans (ScanPlan).
- */
- public static class MultiScanPlan extends FilterPlan {
- final ImmutableList<ScanPlan> scanPlans;
-
- public MultiScanPlan(List<ScanPlan> scanPlans){
- this.scanPlans = ImmutableList.copyOf(scanPlans);
- }
-
- @Override
- public FilterPlan and(FilterPlan other) {
- // Convert to disjunctive normal form (DNF), ie OR of ANDs
- // First get a new set of FilterPlans by doing an AND
- // on each ScanPlan in this one with the other FilterPlan
- List<FilterPlan> newFPlans = new ArrayList<FilterPlan>();
- for (ScanPlan splan : getPlans()) {
- newFPlans.add(splan.and(other));
- }
- //now combine scanPlans in multiple new FilterPlans into one
- // MultiScanPlan
- List<ScanPlan> newScanPlans = new ArrayList<ScanPlan>();
- for (FilterPlan fp : newFPlans) {
- newScanPlans.addAll(fp.getPlans());
- }
- return new MultiScanPlan(newScanPlans);
- }
-
- @Override
- public FilterPlan or(FilterPlan other) {
- // just combine the ScanPlans
- List<ScanPlan> newScanPlans = new ArrayList<ScanPlan>(this.getPlans());
- newScanPlans.addAll(other.getPlans());
- return new MultiScanPlan(newScanPlans);
- }
-
- @Override
- public List<ScanPlan> getPlans() {
- return scanPlans;
- }
- }
-
- /**
- * Represents a single Hbase Scan api call
- */
- public static class ScanPlan extends FilterPlan {
-
- public static class ScanMarker {
- final String value;
- /**
- * If inclusive = true, it means that the
- * marker includes those bytes.
- * If it is false, it means the marker starts at the next possible byte array
- * or ends at the next possible byte array
- */
- final boolean isInclusive;
- final String type;
- ScanMarker(String obj, boolean i, String type){
- this.value = obj;
- this.isInclusive = i;
- this.type = type;
- }
- @Override
- public String toString() {
- return "ScanMarker [" + "value=" + value.toString() + ", isInclusive=" + isInclusive +
- ", type=" + type + "]";
- }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + value.hashCode();
- result = prime * result + (isInclusive ? 1231 : 1237);
- result = prime * result + type.hashCode();
- return result;
- }
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ScanMarker other = (ScanMarker) obj;
- if (!value.equals(other.value))
- return false;
- if (isInclusive != other.isInclusive)
- return false;
- if (type != other.type)
- return false;
- return true;
- }
- }
- public static class ScanMarkerPair {
- public ScanMarkerPair(ScanMarker startMarker, ScanMarker endMarker) {
- this.startMarker = startMarker;
- this.endMarker = endMarker;
- }
- ScanMarker startMarker;
- ScanMarker endMarker;
- }
- // represent Scan start, partition key name -> scanMarkerPair
- Map<String, ScanMarkerPair> markers = new HashMap<String, ScanMarkerPair>();
- List<Operator> ops = new ArrayList<Operator>();
-
- // Get the number of partition key prefixes which can be used in the scan range.
- // For example, if partition key is (year, month, state)
- // 1. year = 2015 and month >= 1 and month < 5
- // year + month can be used in scan range, majorParts = 2
- // 2. year = 2015 and state = 'CA'
- // only year can be used in scan range, majorParts = 1
- // 3. month = 10 and state = 'CA'
- // nothing can be used in scan range, majorParts = 0
- private int getMajorPartsCount(List<FieldSchema> parts) {
- int majorPartsCount = 0;
- while (majorPartsCount<parts.size() && markers.containsKey(parts.get(majorPartsCount).getName())) {
- ScanMarkerPair pair = markers.get(parts.get(majorPartsCount).getName());
- majorPartsCount++;
- if (pair.startMarker!=null && pair.endMarker!=null && pair.startMarker.value.equals(pair
- .endMarker.value) && pair.startMarker.isInclusive && pair.endMarker.isInclusive) {
- // is equal
- continue;
- } else {
- break;
- }
- }
- return majorPartsCount;
- }
- public Filter getFilter(List<FieldSchema> parts) {
- int majorPartsCount = getMajorPartsCount(parts);
- Set<String> majorKeys = new HashSet<String>();
- for (int i=0;i<majorPartsCount;i++) {
- majorKeys.add(parts.get(i).getName());
- }
-
- List<String> names = HBaseUtils.getPartitionNames(parts);
- List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>();
- for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
- if (names.contains(entry.getKey()) && !majorKeys.contains(entry.getKey())) {
- PartitionKeyComparator.Mark startMark = null;
- if (entry.getValue().startMarker != null) {
- startMark = new PartitionKeyComparator.Mark(entry.getValue().startMarker.value,
- entry.getValue().startMarker.isInclusive);
- }
- PartitionKeyComparator.Mark endMark = null;
- if (entry.getValue().endMarker != null) {
- startMark = new PartitionKeyComparator.Mark(entry.getValue().endMarker.value,
- entry.getValue().endMarker.isInclusive);
- }
- PartitionKeyComparator.Range range = new PartitionKeyComparator.Range(
- entry.getKey(), startMark, endMark);
- ranges.add(range);
- }
- }
-
- if (ranges.isEmpty() && ops.isEmpty()) {
- return null;
- } else {
- return new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator(
- StringUtils.join(names, ","), StringUtils.join(HBaseUtils.getPartitionKeyTypes(parts), ","),
- ranges, ops));
- }
- }
-
- public void setStartMarker(String keyName, String keyType, String start, boolean isInclusive) {
- if (markers.containsKey(keyName)) {
- markers.get(keyName).startMarker = new ScanMarker(start, isInclusive, keyType);
- } else {
- ScanMarkerPair marker = new ScanMarkerPair(new ScanMarker(start, isInclusive, keyType), null);
- markers.put(keyName, marker);
- }
- }
-
- public ScanMarker getStartMarker(String keyName) {
- if (markers.containsKey(keyName)) {
- return markers.get(keyName).startMarker;
- } else {
- return null;
- }
- }
-
- public void setEndMarker(String keyName, String keyType, String end, boolean isInclusive) {
- if (markers.containsKey(keyName)) {
- markers.get(keyName).endMarker = new ScanMarker(end, isInclusive, keyType);
- } else {
- ScanMarkerPair marker = new ScanMarkerPair(null, new ScanMarker(end, isInclusive, keyType));
- markers.put(keyName, marker);
- }
- }
-
- public ScanMarker getEndMarker(String keyName) {
- if (markers.containsKey(keyName)) {
- return markers.get(keyName).endMarker;
- } else {
- return null;
- }
- }
-
- @Override
- public FilterPlan and(FilterPlan other) {
- List<ScanPlan> newSPlans = new ArrayList<ScanPlan>();
- for (ScanPlan otherSPlan : other.getPlans()) {
- newSPlans.add(this.and(otherSPlan));
- }
- return new MultiScanPlan(newSPlans);
- }
-
- private ScanPlan and(ScanPlan other) {
- // create combined FilterPlan based on existing lhs and rhs plan
- ScanPlan newPlan = new ScanPlan();
- newPlan.markers.putAll(markers);
-
- for (String keyName : other.markers.keySet()) {
- if (newPlan.markers.containsKey(keyName)) {
- // create new scan start
- ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(keyName),
- other.getStartMarker(keyName), true);
- if (greaterStartMarker != null) {
- newPlan.setStartMarker(keyName, greaterStartMarker.type, greaterStartMarker.value, greaterStartMarker.isInclusive);
- }
-
- // create new scan end
- ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(keyName), other.getEndMarker(keyName),
- false);
- if (lesserEndMarker != null) {
- newPlan.setEndMarker(keyName, lesserEndMarker.type, lesserEndMarker.value, lesserEndMarker.isInclusive);
- }
- } else {
- newPlan.markers.put(keyName, other.markers.get(keyName));
- }
- }
-
- newPlan.ops.addAll(ops);
- newPlan.ops.addAll(other.ops);
- return newPlan;
- }
-
- /**
- * @param lStartMarker
- * @param rStartMarker
- * @param getGreater if true return greater startmarker, else return smaller one
- * @return greater/lesser marker depending on value of getGreater
- */
- @VisibleForTesting
- static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker,
- boolean getGreater) {
- // if one of them has null bytes, just return other
- if(lStartMarker == null) {
- return rStartMarker;
- } else if (rStartMarker == null) {
- return lStartMarker;
- }
- TypeInfo expectedType =
- TypeInfoUtils.getTypeInfoFromTypeString(lStartMarker.type);
- ObjectInspector outputOI =
- TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
- Converter lConverter = ObjectInspectorConverters.getConverter(
- PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
- Converter rConverter = ObjectInspectorConverters.getConverter(
- PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
- Comparable lValue = (Comparable)lConverter.convert(lStartMarker.value);
- Comparable rValue = (Comparable)rConverter.convert(rStartMarker.value);
-
- int compareRes = lValue.compareTo(rValue);
- if (compareRes == 0) {
- // bytes are equal, now compare the isInclusive flags
- if (lStartMarker.isInclusive == rStartMarker.isInclusive) {
- // actually equal, so return any one
- return lStartMarker;
- }
- boolean isInclusive = true;
- // one that does not include the current bytes is greater
- if (getGreater) {
- isInclusive = false;
- }
- // else
- return new ScanMarker(lStartMarker.value, isInclusive, lStartMarker.type);
- }
- if (getGreater) {
- return compareRes == 1 ? lStartMarker : rStartMarker;
- }
- // else
- return compareRes == -1 ? lStartMarker : rStartMarker;
- }
-
-
- @Override
- public FilterPlan or(FilterPlan other) {
- List<ScanPlan> plans = new ArrayList<ScanPlan>(getPlans());
- plans.addAll(other.getPlans());
- return new MultiScanPlan(plans);
- }
-
- @Override
- public List<ScanPlan> getPlans() {
- return Arrays.asList(this);
- }
-
-
- /**
- * @return row suffix - This is appended to db + table, to generate start row for the Scan
- */
- public byte[] getStartRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
- int majorPartsCount = getMajorPartsCount(parts);
- List<String> majorPartTypes = new ArrayList<String>();
- List<String> components = new ArrayList<String>();
- boolean endPrefix = false;
- for (int i=0;i<majorPartsCount;i++) {
- majorPartTypes.add(parts.get(i).getType());
- ScanMarker marker = markers.get(parts.get(i).getName()).startMarker;
- if (marker != null) {
- components.add(marker.value);
- if (i==majorPartsCount-1) {
- endPrefix = !marker.isInclusive;
- }
- } else {
- components.add(null);
- if (i==majorPartsCount-1) {
- endPrefix = false;
- }
- }
- }
- byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
- return bytes;
- }
-
- /**
- * @return row suffix - This is appended to db + table, to generate end row for the Scan
- */
- public byte[] getEndRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
- int majorPartsCount = getMajorPartsCount(parts);
- List<String> majorPartTypes = new ArrayList<String>();
- List<String> components = new ArrayList<String>();
- boolean endPrefix = false;
- for (int i=0;i<majorPartsCount;i++) {
- majorPartTypes.add(parts.get(i).getType());
- ScanMarker marker = markers.get(parts.get(i).getName()).endMarker;
- if (marker != null) {
- components.add(marker.value);
- if (i==majorPartsCount-1) {
- endPrefix = marker.isInclusive;
- }
- } else {
- components.add(null);
- if (i==majorPartsCount-1) {
- endPrefix = true;
- }
- }
- }
- byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
- if (components.isEmpty()) {
- bytes[bytes.length-1]++;
- }
- return bytes;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("ScanPlan:\n");
- for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
- sb.append("key=" + entry.getKey() + "[startMarker=" + entry.getValue().startMarker
- + ", endMarker=" + entry.getValue().endMarker + "]");
- }
- return sb.toString();
- }
-
- }
-
- /**
- * Visitor for ExpressionTree.
- * It first generates the ScanPlan for the leaf nodes. The higher level nodes are
- * either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with
- * the child nodes to generate the plans for higher level nodes.
- */
- @VisibleForTesting
- static class PartitionFilterGenerator extends TreeVisitor {
- private FilterPlan curPlan;
-
- // this tells us if there is a condition that did not get included in the plan
- // such condition would be treated as getting evaluated to TRUE
- private boolean hasUnsupportedCondition = false;
-
- //Need to cache the left plans for the TreeNode. Use IdentityHashMap here
- // as we don't want to dedupe on two TreeNode that are otherwise considered equal
- Map<TreeNode, FilterPlan> leftPlans = new IdentityHashMap<TreeNode, FilterPlan>();
-
- // temporary params for current left and right side plans, for AND, OR
- private FilterPlan rPlan;
-
- private Map<String, String> nameToType = new HashMap<String, String>();
-
- public PartitionFilterGenerator(List<FieldSchema> parts) {
- for (FieldSchema part : parts) {
- nameToType.put(part.getName(), part.getType());
- }
- }
-
- FilterPlan getPlan() {
- return curPlan;
- }
-
- @Override
- protected void beginTreeNode(TreeNode node) throws MetaException {
- // reset the params
- curPlan = rPlan = null;
- }
-
- @Override
- protected void midTreeNode(TreeNode node) throws MetaException {
- leftPlans.put(node, curPlan);
- curPlan = null;
- }
-
- @Override
- protected void endTreeNode(TreeNode node) throws MetaException {
- rPlan = curPlan;
- FilterPlan lPlan = leftPlans.get(node);
- leftPlans.remove(node);
-
- switch (node.getAndOr()) {
- case AND:
- curPlan = lPlan.and(rPlan);
- break;
- case OR:
- curPlan = lPlan.or(rPlan);
- break;
- default:
- throw new AssertionError("Unexpected logical operation " + node.getAndOr());
- }
-
- }
-
-
- @Override
- public void visit(LeafNode node) throws MetaException {
- ScanPlan leafPlan = new ScanPlan();
- curPlan = leafPlan;
-
- // this is a condition on first partition column, so might influence the
- // start and end of the scan
- final boolean INCLUSIVE = true;
- switch (node.operator) {
- case EQUALS:
- leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
- leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
- break;
- case GREATERTHAN:
- leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
- break;
- case GREATERTHANOREQUALTO:
- leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
- break;
- case LESSTHAN:
- leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
- break;
- case LESSTHANOREQUALTO:
- leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
- break;
- case LIKE:
- leafPlan.ops.add(new Operator(Operator.Type.LIKE, node.keyName, node.value.toString()));
- break;
- case NOTEQUALS:
- case NOTEQUALS2:
- leafPlan.ops.add(new Operator(Operator.Type.NOTEQUALS, node.keyName, node.value.toString()));
- break;
- }
- }
-
- private boolean hasUnsupportedCondition() {
- return hasUnsupportedCondition;
- }
-
- }
-
- public static class PlanResult {
- public final FilterPlan plan;
- public final boolean hasUnsupportedCondition;
- PlanResult(FilterPlan plan, boolean hasUnsupportedCondition) {
- this.plan = plan;
- this.hasUnsupportedCondition = hasUnsupportedCondition;
- }
- }
-
- public static PlanResult getFilterPlan(ExpressionTree exprTree, List<FieldSchema> parts) throws MetaException {
- if (exprTree == null) {
- // TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102
- return new PlanResult(new ScanPlan(), true);
- }
- PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(parts);
- exprTree.accept(pGenerator);
- return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e061557/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
deleted file mode 100644
index 5f89769..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hive.metastore.hbase;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Deadline;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Role;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A tool to take the contents of an RDBMS based Hive metastore and import it into an HBase based
- * one. To use this the config files for Hive configured to work with the RDBMS (that is,
- * including the JDBC string, etc.) as well as HBase configuration files must be in the path.
- * There should not be a hive-site.xml that specifies HBaseStore in the path. This tool will then
- * handle connecting to the RDBMS via the {@link org.apache.hadoop.hive.metastore.ObjectStore}
- * and HBase via {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} and transferring the
- * data.
- *
- * This tool can import an entire metastore or only selected objects. When selecting objects it
- * is necessary to fully specify the object's name. For example, if you want to import the table
- * T in the default database it needs to be identified as default.T. The same is true for
- * functions. When an object is specified, everything under that object will be imported (e.g.
- * if you select database D, then all tables and functions in that database will be
- * imported as well).
- *
- * At this point only tables and partitions are handled in parallel as it is assumed there are
- * relatively few of everything else.
- *
- * Note that HBaseSchemaTool must have already been used to create the appropriate tables in HBase.
- */
-public class HBaseImport {
-
- static final private Logger LOG = LoggerFactory.getLogger(HBaseImport.class.getName());
-
- public static int main(String[] args) {
- try {
- HBaseImport tool = new HBaseImport();
- int rv = tool.init(args);
- if (rv != 0) return rv;
- tool.run();
- } catch (Exception e) {
- System.err.println("Caught exception " + e.getClass().getName() + " with message <" +
- e.getMessage() + ">");
- return 1;
- }
- return 0;
- }
-
- private ThreadLocal<RawStore> rdbmsStore = new ThreadLocal<RawStore>() {
- @Override
- protected RawStore initialValue() {
- if (rdbmsConf == null) {
- throw new RuntimeException("order violation, need to set rdbms conf first");
- }
- RawStore os = new ObjectStore();
- os.setConf(rdbmsConf);
- return os;
- }
- };
-
- private ThreadLocal<RawStore> hbaseStore = new ThreadLocal<RawStore>() {
- @Override
- protected RawStore initialValue() {
- if (hbaseConf == null) {
- throw new RuntimeException("order violation, need to set hbase conf first");
- }
- RawStore hs = new HBaseStore();
- hs.setConf(hbaseConf);
- return hs;
- }
- };
-
- private Configuration rdbmsConf;
- private Configuration hbaseConf;
- private List<Database> dbs;
- private BlockingQueue<Table> partitionedTables;
- private BlockingQueue<String[]> tableNameQueue;
- private BlockingQueue<String[]> indexNameQueue;
- private BlockingQueue<PartQueueEntry> partQueue;
- private boolean writingToQueue, readersFinished;
- private boolean doKerberos, doAll;
- private List<String> rolesToImport, dbsToImport, tablesToImport, functionsToImport;
- private int parallel;
- private int batchSize;
-
- private HBaseImport() {}
-
- @VisibleForTesting
- public HBaseImport(String... args) throws ParseException {
- init(args);
- }
-
- private int init(String... args) throws ParseException {
- Options options = new Options();
-
- doAll = doKerberos = false;
- parallel = 1;
- batchSize = 1000;
-
- options.addOption(OptionBuilder
- .withLongOpt("all")
- .withDescription("Import the full metastore")
- .create('a'));
-
- options.addOption(OptionBuilder
- .withLongOpt("batchsize")
- .withDescription("Number of partitions to read and write in a batch, defaults to 1000")
- .hasArg()
- .create('b'));
-
- options.addOption(OptionBuilder
- .withLongOpt("database")
- .withDescription("Import a single database")
- .hasArgs()
- .create('d'));
-
- options.addOption(OptionBuilder
- .withLongOpt("help")
- .withDescription("You're looking at it")
- .create('h'));
-
- options.addOption(OptionBuilder
- .withLongOpt("function")
- .withDescription("Import a single function")
- .hasArgs()
- .create('f'));
-
- options.addOption(OptionBuilder
- .withLongOpt("kerberos")
- .withDescription("Import all kerberos related objects (master key, tokens)")
- .create('k'));
-
- options.addOption(OptionBuilder
- .withLongOpt("parallel")
- .withDescription("Parallel factor for loading (only applied to tables and partitions), " +
- "defaults to 1")
- .hasArg()
- .create('p'));
-
- options.addOption(OptionBuilder
- .withLongOpt("role")
- .withDescription("Import a single role")
- .hasArgs()
- .create('r'));
-
- options.addOption(OptionBuilder
- .withLongOpt("tables")
- .withDescription("Import a single tables")
- .hasArgs()
- .create('t'));
-
- CommandLine cli = new GnuParser().parse(options, args);
-
- // Process help, if it was asked for, this must be done first
- if (cli.hasOption('h')) {
- printHelp(options);
- return 1;
- }
-
- boolean hasCmd = false;
- // Now process the other command line args
- if (cli.hasOption('a')) {
- hasCmd = true;
- doAll = true;
- }
- if (cli.hasOption('b')) {
- batchSize = Integer.parseInt(cli.getOptionValue('b'));
- }
- if (cli.hasOption('d')) {
- hasCmd = true;
- dbsToImport = Arrays.asList(cli.getOptionValues('d'));
- }
- if (cli.hasOption('f')) {
- hasCmd = true;
- functionsToImport = Arrays.asList(cli.getOptionValues('f'));
- }
- if (cli.hasOption('p')) {
- parallel = Integer.parseInt(cli.getOptionValue('p'));
- }
- if (cli.hasOption('r')) {
- hasCmd = true;
- rolesToImport = Arrays.asList(cli.getOptionValues('r'));
- }
- if (cli.hasOption('k')) {
- doKerberos = true;
- }
- if (cli.hasOption('t')) {
- hasCmd = true;
- tablesToImport = Arrays.asList(cli.getOptionValues('t'));
- }
- if (!hasCmd) {
- printHelp(options);
- return 1;
- }
-
- dbs = new ArrayList<>();
- // We don't want to bound the size of the table queue because we keep it all in memory
- partitionedTables = new LinkedBlockingQueue<>();
- tableNameQueue = new LinkedBlockingQueue<>();
- indexNameQueue = new LinkedBlockingQueue<>();
-
- // Bound the size of this queue so we don't get too much in memory.
- partQueue = new ArrayBlockingQueue<>(parallel * 2);
- return 0;
- }
-
- private void printHelp(Options options) {
- (new HelpFormatter()).printHelp("hbaseschematool", options);
- }
-
- @VisibleForTesting
- void run() throws MetaException, InstantiationException, IllegalAccessException,
- NoSuchObjectException, InvalidObjectException, InterruptedException {
- // Order here is crucial, as you can't add tables until you've added databases, etc.
- init();
- if (doAll || rolesToImport != null) {
- copyRoles();
- }
- if (doAll || dbsToImport != null) {
- copyDbs();
- }
- if (doAll || dbsToImport != null || tablesToImport != null) {
- copyTables();
- copyPartitions();
- copyIndexes();
- }
- if (doAll || dbsToImport != null || functionsToImport != null) {
- copyFunctions();
- }
- if (doAll || doKerberos) {
- copyKerberos();
- }
- }
-
- private void init() throws MetaException, IllegalAccessException, InstantiationException {
- if (rdbmsConf != null) {
- // We've been configured for testing, so don't do anything here.
- return;
- }
- // We're depending on having everything properly in the path
- rdbmsConf = new HiveConf();
- hbaseConf = new HiveConf();//
- HiveConf.setVar(hbaseConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
- HBaseStore.class.getName());
- HiveConf.setBoolVar(hbaseConf, HiveConf.ConfVars.METASTORE_FASTPATH, true);
-
- // First get a connection to the RDBMS based store
- rdbmsStore.get().setConf(rdbmsConf);
-
- // Get a connection to the HBase based store
- hbaseStore.get().setConf(hbaseConf);
- }
-
- private void copyRoles() throws NoSuchObjectException, InvalidObjectException, MetaException {
- screen("Copying roles");
- List<String> toCopy = doAll ? rdbmsStore.get().listRoleNames() : rolesToImport;
- for (String roleName : toCopy) {
- Role role = rdbmsStore.get().getRole(roleName);
- screen("Copying role " + roleName);
- hbaseStore.get().addRole(roleName, role.getOwnerName());
- }
- }
-
- private void copyDbs() throws MetaException, NoSuchObjectException, InvalidObjectException {
- screen("Copying databases");
- List<String> toCopy = doAll ? rdbmsStore.get().getAllDatabases() : dbsToImport;
- for (String dbName : toCopy) {
- Database db = rdbmsStore.get().getDatabase(dbName);
- dbs.add(db);
- screen("Copying database " + dbName);
- hbaseStore.get().createDatabase(db);
- }
- }
-
- private void copyTables() throws MetaException, InvalidObjectException, InterruptedException {
- screen("Copying tables");
-
- // Start the parallel threads that will copy the tables
- Thread[] copiers = new Thread[parallel];
- writingToQueue = true;
- for (int i = 0; i < parallel; i++) {
- copiers[i] = new TableCopier();
- copiers[i].start();
- }
-
- // Put tables from the databases we copied into the queue
- for (Database db : dbs) {
- screen("Coyping tables in database " + db.getName());
- for (String tableName : rdbmsStore.get().getAllTables(db.getName())) {
- tableNameQueue.put(new String[]{db.getName(), tableName});
- }
- }
-
- // Now put any specifically requested tables into the queue
- if (tablesToImport != null) {
- for (String compoundTableName : tablesToImport) {
- String[] tn = compoundTableName.split("\\.");
- if (tn.length != 2) {
- error(compoundTableName + " not in proper form. Must be in form dbname.tablename. " +
- "Ignoring this table and continuing.");
- } else {
- tableNameQueue.put(new String[]{tn[0], tn[1]});
- }
- }
- }
- writingToQueue = false;
-
- // Wait until we've finished adding all the tables
- for (Thread copier : copiers) copier.join();
- }
-
- private class TableCopier extends Thread {
- @Override
- public void run() {
- while (writingToQueue || tableNameQueue.size() > 0) {
- try {
- String[] name = tableNameQueue.poll(1, TimeUnit.SECONDS);
- if (name != null) {
- Table table = rdbmsStore.get().getTable(name[0], name[1]);
- // If this has partitions, put it in the list to fetch partions for
- if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
- partitionedTables.put(table);
- }
- screen("Copying table " + name[0] + "." + name[1]);
- hbaseStore.get().createTable(table);
-
- // See if the table has any constraints, and if so copy those as well
- List<SQLPrimaryKey> pk =
- rdbmsStore.get().getPrimaryKeys(table.getDbName(), table.getTableName());
- if (pk != null && pk.size() > 0) {
- LOG.debug("Found primary keys, adding them");
- hbaseStore.get().addPrimaryKeys(pk);
- }
-
- // Passing null as the target table name results in all of the foreign keys being
- // retrieved.
- List<SQLForeignKey> fks =
- rdbmsStore.get().getForeignKeys(null, null, table.getDbName(), table.getTableName());
- if (fks != null && fks.size() > 0) {
- LOG.debug("Found foreign keys, adding them");
- hbaseStore.get().addForeignKeys(fks);
- }
- }
- } catch (InterruptedException | MetaException | InvalidObjectException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private void copyIndexes() throws MetaException, InvalidObjectException, InterruptedException {
- screen("Copying indexes");
-
- // Start the parallel threads that will copy the indexes
- Thread[] copiers = new Thread[parallel];
- writingToQueue = true;
- for (int i = 0; i < parallel; i++) {
- copiers[i] = new IndexCopier();
- copiers[i].start();
- }
-
- // Put indexes from the databases we copied into the queue
- for (Database db : dbs) {
- screen("Coyping indexes in database " + db.getName());
- for (String tableName : rdbmsStore.get().getAllTables(db.getName())) {
- for (Index index : rdbmsStore.get().getIndexes(db.getName(), tableName, -1)) {
- indexNameQueue.put(new String[]{db.getName(), tableName, index.getIndexName()});
- }
- }
- }
-
- // Now put any specifically requested tables into the queue
- if (tablesToImport != null) {
- for (String compoundTableName : tablesToImport) {
- String[] tn = compoundTableName.split("\\.");
- if (tn.length != 2) {
- error(compoundTableName + " not in proper form. Must be in form dbname.tablename. " +
- "Ignoring this table and continuing.");
- } else {
- for (Index index : rdbmsStore.get().getIndexes(tn[0], tn[1], -1)) {
- indexNameQueue.put(new String[]{tn[0], tn[1], index.getIndexName()});
- }
- }
- }
- }
-
- writingToQueue = false;
-
- // Wait until we've finished adding all the tables
- for (Thread copier : copiers) copier.join();
- }
-
- private class IndexCopier extends Thread {
- @Override
- public void run() {
- while (writingToQueue || indexNameQueue.size() > 0) {
- try {
- String[] name = indexNameQueue.poll(1, TimeUnit.SECONDS);
- if (name != null) {
- Index index = rdbmsStore.get().getIndex(name[0], name[1], name[2]);
- screen("Copying index " + name[0] + "." + name[1] + "." + name[2]);
- hbaseStore.get().addIndex(index);
- }
- } catch (InterruptedException | MetaException | InvalidObjectException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- /* Partition copying is a little complex. As we went through and copied the tables we put each
- * partitioned table into a queue. We will now go through that queue and add partitions for the
- * tables. We do the finding of partitions and writing of them separately and in parallel.
- * This way if there is one table with >> partitions then all of the others that skew won't
- * hurt us. To avoid pulling all of the partitions for a table into memory, we batch up
- * partitions (by default in batches of 1000) and copy them over in batches.
- */
- private void copyPartitions() throws MetaException, NoSuchObjectException,
- InvalidObjectException, InterruptedException {
- screen("Copying partitions");
- readersFinished = false;
- Thread[] readers = new Thread[parallel];
- Thread[] writers = new Thread[parallel];
- for (int i = 0; i < parallel; i++) {
- readers[i] = new PartitionReader();
- readers[i].start();
- writers[i] = new PartitionWriter();
- writers[i].start();
- }
-
- for (Thread reader : readers) reader.join();
- readersFinished = true;
-
- // Wait until we've finished adding all the partitions
- for (Thread writer : writers) writer.join();
- }
-
- private class PartitionReader extends Thread {
- @Override
- public void run() {
- while (partitionedTables.size() > 0) {
- try {
- Table table = partitionedTables.poll(1, TimeUnit.SECONDS);
- if (table != null) {
- screen("Fetching partitions for table " + table.getDbName() + "." +
- table.getTableName());
- List<String> partNames =
- rdbmsStore.get().listPartitionNames(table.getDbName(), table.getTableName(),
- (short) -1);
- if (partNames.size() <= batchSize) {
- LOG.debug("Adding all partition names to queue for " + table.getDbName() + "." +
- table.getTableName());
- partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(), partNames));
- } else {
- int goUntil = partNames.size() % batchSize == 0 ? partNames.size() / batchSize :
- partNames.size() / batchSize + 1;
- for (int i = 0; i < goUntil; i++) {
- int start = i * batchSize;
- int end = Math.min((i + 1) * batchSize, partNames.size());
- LOG.debug("Adding partitions " + start + " to " + end + " for " + table.getDbName()
- + "." + table.getTableName());
- partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(),
- partNames.subList(start, end)));
- }
- }
- }
- } catch (InterruptedException | MetaException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private class PartitionWriter extends Thread {
- @Override
- public void run() {
- // This keeps us from throwing exceptions in our raw store calls
- Deadline.registerIfNot(1000000);
- while (!readersFinished || partQueue.size() > 0) {
- try {
- PartQueueEntry entry = partQueue.poll(1, TimeUnit.SECONDS);
- if (entry != null) {
- LOG.info("Writing partitions " + entry.dbName + "." + entry.tableName + "." +
- StringUtils.join(entry.partNames, ':'));
- // Fetch these partitions and write them to HBase
- Deadline.startTimer("hbaseimport");
- List<Partition> parts =
- rdbmsStore.get().getPartitionsByNames(entry.dbName, entry.tableName,
- entry.partNames);
- hbaseStore.get().addPartitions(entry.dbName, entry.tableName, parts);
- Deadline.stopTimer();
- }
- } catch (InterruptedException | MetaException | InvalidObjectException |
- NoSuchObjectException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private void copyFunctions() throws MetaException, NoSuchObjectException, InvalidObjectException {
- screen("Copying functions");
- // Copy any functions from databases we copied.
- for (Database db : dbs) {
- screen("Copying functions in database " + db.getName());
- for (String funcName : rdbmsStore.get().getFunctions(db.getName(), "*")) {
- copyOneFunction(db.getName(), funcName);
- }
- }
- // Now do any specifically requested functions
- if (functionsToImport != null) {
- for (String compoundFuncName : functionsToImport) {
- String[] fn = compoundFuncName.split("\\.");
- if (fn.length != 2) {
- error(compoundFuncName + " not in proper form. Must be in form dbname.funcname. " +
- "Ignoring this function and continuing.");
- } else {
- copyOneFunction(fn[0], fn[1]);
- }
- }
- }
- }
-
- private void copyOneFunction(String dbName, String funcName) throws MetaException,
- InvalidObjectException {
- Function func = rdbmsStore.get().getFunction(dbName, funcName);
- screen("Copying function " + dbName + "." + funcName);
- hbaseStore.get().createFunction(func);
- }
-
- private void copyKerberos() throws MetaException {
- screen("Copying kerberos related items");
- for (String tokenId : rdbmsStore.get().getAllTokenIdentifiers()) {
- String token = rdbmsStore.get().getToken(tokenId);
- hbaseStore.get().addToken(tokenId, token);
- }
- for (String masterKey : rdbmsStore.get().getMasterKeys()) {
- hbaseStore.get().addMasterKey(masterKey);
- }
- }
-
- private void screen(String msg) {
- LOG.info(msg);
- System.out.println(msg);
- }
-
- private void error(String msg) {
- LOG.error(msg);
- System.err.println("ERROR: " + msg);
- }
-
- @VisibleForTesting
- void setConnections(RawStore rdbms, RawStore hbase) {
- rdbmsStore.set(rdbms);
- hbaseStore.set(hbase);
- rdbmsConf = rdbms.getConf();
- hbaseConf = hbase.getConf();
- }
-
- private static class PartQueueEntry {
- final String dbName;
- final String tableName;
- final List<String> partNames;
-
- PartQueueEntry(String d, String t, List<String> p) {
- dbName = d;
- tableName = t;
- partNames = p;
- }
- }
-
-}