You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pu...@apache.org on 2016/12/21 12:06:10 UTC
[2/3] lens git commit: Initial chnages for Union
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
index 11eb8f7..64a9626 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
@@ -98,89 +98,93 @@ class QueriedPhraseContext extends TracksQueriedColumns implements TrackQueriedC
return false;
}
- boolean isEvaluable(CubeQueryContext cubeQl, CandidateFact cfact) throws LensException {
+ /**
+ * TODO union: change CandidateFact to StorageCandidate. Let the callers typecast and send for now.
+ * @param cubeQl
+ * @param sc
+ * @return
+ * @throws LensException
+ */
+ public boolean isEvaluable(CubeQueryContext cubeQl, StorageCandidate sc) throws LensException {
// all measures of the queried phrase should be present
for (String msr : queriedMsrs) {
- if (!checkForColumnExistsAndValidForRange(cfact, msr, cubeQl)) {
+ if (!checkForColumnExistsAndValidForRange(sc, msr, cubeQl)) {
return false;
}
}
// all expression columns should be evaluable
for (String exprCol : queriedExprColumns) {
- if (!cubeQl.getExprCtx().isEvaluable(exprCol, cfact)) {
- log.info("expression {} is not evaluable in fact table:{}", expr, cfact);
+ if (!cubeQl.getExprCtx().isEvaluable(exprCol, sc)) {
+ log.info("expression {} is not evaluable in fact table:{}", expr, sc);
return false;
}
}
// all dim-attributes should be present.
for (String col : queriedDimAttrs) {
- if (!cfact.getColumns().contains(col.toLowerCase())) {
+ if (!sc.getColumns().contains(col.toLowerCase())) {
// check if it available as reference
- if (!cubeQl.getDeNormCtx().addRefUsage(cfact, col, cubeQl.getCube().getName())) {
- log.info("column {} is not available in fact table:{} ", col, cfact);
+ if (!cubeQl.getDeNormCtx().addRefUsage(sc, col, cubeQl.getCube().getName())) {
+ log.info("column {} is not available in fact table:{} ", col, sc);
return false;
}
- } else if (!isFactColumnValidForRange(cubeQl, cfact, col)) {
- log.info("column {} is not available in range queried in fact {}", col, cfact);
+ } else if (!isFactColumnValidForRange(cubeQl, sc, col)) {
+ log.info("column {} is not available in range queried in fact {}", col, sc);
return false;
}
}
return true;
}
- public static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
+ private static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
return (isColumnAvailableFrom(range.getFromDate(), startTime)
&& isColumnAvailableTill(range.getToDate(), endTime));
}
- public static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
+ private static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
return (startTime == null) || date.equals(startTime) || date.after(startTime);
}
- public static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
+ private static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
return (endTime == null) || date.equals(endTime) || date.before(endTime);
}
- public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, CandidateTable cfact, String col) {
+ public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, StorageCandidate sc, String col) {
for(TimeRange range : cubeql.getTimeRanges()) {
- if (!isColumnAvailableInRange(range, getFactColumnStartTime(cfact, col), getFactColumnEndTime(cfact, col))) {
+ if (!isColumnAvailableInRange(range, getFactColumnStartTime(sc, col), getFactColumnEndTime(sc, col))) {
return false;
}
}
return true;
}
- public static Date getFactColumnStartTime(CandidateTable table, String factCol) {
+ public static Date getFactColumnStartTime(StorageCandidate sc, String factCol) {
Date startTime = null;
- if (table instanceof CandidateFact) {
- for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
+ for (String key : sc.getTable().getProperties().keySet()) {
if (key.contains(MetastoreConstants.FACT_COL_START_TIME_PFX)) {
String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_START_TIME_PFX);
if (factCol.equals(propCol)) {
- startTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true);
+ startTime = sc.getTable().getDateFromProperty(key, false, true);
}
}
}
- }
return startTime;
}
- public static Date getFactColumnEndTime(CandidateTable table, String factCol) {
+ public static Date getFactColumnEndTime(StorageCandidate sc, String factCol) {
Date endTime = null;
- if (table instanceof CandidateFact) {
- for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
+ for (String key : sc.getTable().getProperties().keySet()) {
if (key.contains(MetastoreConstants.FACT_COL_END_TIME_PFX)) {
String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_END_TIME_PFX);
if (factCol.equals(propCol)) {
- endTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true);
+ endTime = sc.getTable().getDateFromProperty(key, false, true);
}
}
}
- }
- return endTime;
+ return endTime;
}
- static boolean checkForColumnExistsAndValidForRange(CandidateTable table, String column, CubeQueryContext cubeql) {
- return (table.getColumns().contains(column) && isFactColumnValidForRange(cubeql, table, column));
+ static boolean checkForColumnExistsAndValidForRange(StorageCandidate sc, String column, CubeQueryContext cubeql) {
+ return (sc.getColumns().contains(column) && isFactColumnValidForRange(cubeql, sc, column));
}
+
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
index 7298604..bdd6376 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
@@ -83,4 +83,6 @@ public interface QueryAST {
ASTNode getOrderByAST();
void setOrderByAST(ASTNode node);
+
+ void setJoinAST(ASTNode node);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
new file mode 100644
index 0000000..22038f3
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.cube.parse;
+
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import static org.apache.lens.cube.parse.StorageUtil.*;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.metastore.DataCompletenessChecker;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import com.google.common.collect.Sets;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Represents a fact on a storage table and the dimensions it needs to be joined with to answer the query
+ */
+@Slf4j
+public class StorageCandidate implements Candidate, CandidateTable {
+
+ @Getter
+ private final CubeQueryContext cubeql;
+ private final TimeRangeWriter rangeWriter;
+ private final String processTimePartCol;
+ private final CubeMetastoreClient client;
+ private final String completenessPartCol;
+ private final float completenessThreshold;
+ @Getter
+ private final String name;
+ /**
+ * Valid udpate periods populated by Phase 1.
+ */
+ private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>();
+ private Configuration conf = null;
+ private Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
+ private SimpleDateFormat partWhereClauseFormat = null;
+ /**
+ * Participating fact, storage and dimensions for this StorageCandidate
+ */
+ @Getter
+ private CubeFactTable fact;
+ @Getter
+ private String storageName;
+ private Map<Dimension, CandidateDim> dimensions;
+ private Map<TimeRange, String> rangeToWhere = new LinkedHashMap<>();
+ @Getter
+ private CubeInterface cube;
+ /**
+ * Cached fact columns
+ */
+ private Collection<String> factColumns;
+ /**
+ * This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data.
+ * Value is a map of date string and %completeness.
+ */
+ @Getter
+ @Setter
+ private Map<String, Map<String, Float>> incompleteDataDetails;
+ /**
+ * Partition calculated by getPartition() method.
+ */
+ private Set<FactPartition> storagePartitions = new HashSet<>();
+ /**
+ * Non existing partitions
+ */
+ private Set<String> nonExistingPartitions = new HashSet<>();
+ @Getter
+ private String alias = null;
+
+ public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, String alias,
+ CubeQueryContext cubeql) {
+ if ((cube == null) || (fact == null) || (storageName == null) || (alias == null)) {
+ throw new IllegalArgumentException("Cube,fact and storageName should be non null");
+ }
+ this.cube = cube;
+ this.fact = fact;
+ this.cubeql = cubeql;
+ this.storageName = storageName;
+ this.conf = cubeql.getConf();
+ this.alias = alias;
+ this.name = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName);
+ rangeWriter = ReflectionUtils.newInstance(conf
+ .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER,
+ TimeRangeWriter.class), conf);
+ this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL);
+ String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
+ if (formatStr != null) {
+ this.partWhereClauseFormat = new SimpleDateFormat(formatStr);
+ }
+ completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
+ client = cubeql.getMetastoreClient();
+ completenessThreshold = conf
+ .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
+ }
+
+ @Override
+ public String toHQL() {
+ return null;
+ }
+
+ @Override
+ public QueryAST getQueryAst() {
+ return null;
+ }
+
+ @Override
+ public String getStorageString(String alias) {
+ return null;
+ }
+
+ @Override
+ public AbstractCubeTable getTable() {
+ return fact;
+ }
+
+ @Override
+ public AbstractCubeTable getBaseTable() {
+ return (AbstractCubeTable) cube;
+ }
+
+ @Override
+ public Collection<String> getColumns() {
+ if (factColumns == null) {
+ factColumns = fact.getValidColumns();
+ if (factColumns == null) {
+ factColumns = fact.getAllFieldNames();
+ }
+ }
+ return factColumns;
+ }
+
+ @Override
+ public Date getStartTime() {
+ return fact.getStartTime();
+ }
+
+ @Override
+ public Date getEndTime() {
+ return fact.getEndTime();
+ }
+
+ @Override
+ public double getCost() {
+ return fact.weight();
+ }
+
+ @Override
+ public boolean contains(Candidate candidate) {
+ return this.equals(candidate);
+ }
+
+ @Override
+ public Collection<Candidate> getChildren() {
+ return null;
+ }
+
+ private void updatePartitionStorage(FactPartition part) throws LensException {
+ try {
+ if (client.isStorageTablePartitionACandidate(name, part.getPartSpec()) && (client
+ .factPartitionExists(fact, part, name))) {
+ part.getStorageTables().add(name);
+ part.setFound(true);
+ }
+ } catch (HiveException e) {
+ log.warn("Hive exception while getting storage table partition", e);
+ }
+ }
+
+ /**
+ * Gets FactPartitions for the given fact using the following logic
+ *
+ * 1. Find the max update interval that will be used for the query. Lets assume time range is 15 Sep to 15 Dec and the
+ * fact has two storage with update periods as MONTHLY,DAILY,HOURLY. In this case the data for
+ * [15 sep - 1 oct)U[1 Dec - 15 Dec) will be answered by DAILY partitions and [1 oct - 1Dec) will be answered by
+ * MONTHLY partitions. The max interavl for this query will be MONTHLY.
+ *
+ * 2.Prune Storgaes that do not fall in the queries time range.
+ * {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)}
+ *
+ * 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for these two months.
+ * Check validity of FactPartitions for Oct and Nov via {@link #updatePartitionStorage(FactPartition)}.
+ * If the partition is missing, try getting partitions for the time range form other update periods (DAILY,HOURLY).This
+ * is achieved by calling getPartitions() recursively but passing only 2 update periods (DAILY,HOURLY)
+ *
+ * 4.If the monthly partitions are found, check for lookahead partitions and call getPartitions recursively for the
+ * remaining time intervals i.e, [15 sep - 1 oct) and [1 Dec - 15 Dec)
+ */
+ private boolean getPartitions(Date fromDate, Date toDate, String partCol, Set<FactPartition> partitions,
+ TreeSet<UpdatePeriod> updatePeriods, boolean addNonExistingParts, boolean failOnPartialData,
+ PartitionRangesForPartitionColumns missingPartitions) throws LensException {
+ if (fromDate.equals(toDate) || fromDate.after(toDate)) {
+ return true;
+ }
+ UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
+ if (interval == null) {
+ log.info("No max interval for range: {} to {}", fromDate, toDate);
+ return false;
+ }
+
+ if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
+ FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ log.info("Added continuous fact partition for storage table {}", name);
+ return true;
+ }
+
+ if (!client.isStorageTableCandidateForRange(name, fromDate, toDate)) {
+ cubeql.addStoragePruningMsg(this,
+ new CandidateTablePruneCause(CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+ // skipStorageCauses.put(name, new CandidateTablePruneCause.SkipStorageCause(RANGE_NOT_ANSWERABLE));
+ return false;
+ } else if (!client.partColExists(name, partCol)) {
+ log.info("{} does not exist in {}", partCol, name);
+ // skipStorageCauses.put(name, CandidateTablePruneCause.SkipStorageCause.partColDoesNotExist(partCol));
+ List<String> missingCols = new ArrayList<>();
+ missingCols.add(partCol);
+ cubeql.addStoragePruningMsg(this, partitionColumnsMissing(missingCols));
+ return false;
+ }
+
+ Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
+ Date floorToDate = DateUtil.getFloorDate(toDate, interval);
+
+ int lookAheadNumParts = conf
+ .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
+
+ TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
+ // add partitions from ceilFrom to floorTo
+ while (iter.hasNext()) {
+ Date dt = iter.next();
+ Date nextDt = iter.peekNext();
+ FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
+ updatePartitionStorage(part);
+ log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
+ if (part.isFound()) {
+ log.debug("Adding existing partition {}", part);
+ partitions.add(part);
+ log.debug("Looking for look ahead process time partitions for {}", part);
+ if (processTimePartCol == null) {
+ log.debug("processTimePartCol is null");
+ } else if (partCol.equals(processTimePartCol)) {
+ log.debug("part column is process time col");
+ } else if (updatePeriods.first().equals(interval)) {
+ log.debug("Update period is the least update period");
+ } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
+ // see if this is the part of the last-n look ahead partitions
+ log.debug("Not a look ahead partition");
+ } else {
+ log.debug("Looking for look ahead process time partitions for {}", part);
+ // check if finer partitions are required
+ // final partitions are required if no partitions from
+ // look-ahead
+ // process time are present
+ TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, interval, 1)
+ .iterator();
+ while (processTimeIter.hasNext()) {
+ Date pdt = processTimeIter.next();
+ Date nextPdt = processTimeIter.peekNext();
+ FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
+ partWhereClauseFormat);
+ updatePartitionStorage(processTimePartition);
+ if (processTimePartition.isFound()) {
+ log.debug("Finer parts not required for look-ahead partition :{}", part);
+ } else {
+ log.debug("Looked ahead process time partition {} is not found", processTimePartition);
+ TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
+ newset.addAll(updatePeriods);
+ newset.remove(interval);
+ log.debug("newset of update periods:{}", newset);
+ if (!newset.isEmpty()) {
+ // Get partitions for look ahead process time
+ log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
+ Set<FactPartition> processTimeParts = getPartitions(
+ TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(),
+ newset, true, false, missingPartitions);
+ log.debug("Look ahead partitions: {}", processTimeParts);
+ TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
+ for (FactPartition pPart : processTimeParts) {
+ log.debug("Looking for finer partitions in pPart: {}", pPart);
+ for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
+ FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
+ partWhereClauseFormat);
+ updatePartitionStorage(innerPart);
+ innerPart.setFound(pPart.isFound());
+ if (innerPart.isFound()) {
+ partitions.add(innerPart);
+ }
+ }
+ log.debug("added all sub partitions blindly in pPart: {}", pPart);
+ }
+ }
+ }
+ }
+ }
+ } else {
+ log.info("Partition:{} does not exist in any storage table", part);
+ TreeSet<UpdatePeriod> newset = new TreeSet<>();
+ newset.addAll(updatePeriods);
+ newset.remove(interval);
+ if (!getPartitions(dt, nextDt, partCol, partitions, newset, false, failOnPartialData, missingPartitions)) {
+ log.debug("Adding non existing partition {}", part);
+ if (addNonExistingParts) {
+ // Add non existing partitions for all cases of whether we populate all non existing or not.
+ missingPartitions.add(part);
+ if (!failOnPartialData) {
+ if (client.isStorageTablePartitionACandidate(name, part.getPartSpec())) {
+ log.info("Storage tables not eligible");
+ return false;
+ }
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ }
+ } else {
+ log.info("No finer granual partitions exist for {}", part);
+ return false;
+ }
+ } else {
+ log.debug("Finer granual partitions added for {}", part);
+ }
+ }
+ }
+ return
+ getPartitions(fromDate, ceilFromDate, partCol, partitions, updatePeriods, addNonExistingParts, failOnPartialData,
+ missingPartitions) && getPartitions(floorToDate, toDate, partCol, partitions, updatePeriods,
+ addNonExistingParts, failOnPartialData, missingPartitions);
+ }
+
+ /**
+ * Finds all the partitions for a storage table with a particular time range.
+ *
+ * @param timeRange : TimeRange to check completeness for. TimeRange consists of start time, end time and the
+ * partition column
+ * @param failOnPartialData : fail fast if the candidate can answer the query only partially
+ * @return Steps:
+ * 1. Get skip storage causes
+ * 2. getPartitions for timeRange and validUpdatePeriods
+ */
+ @Override
+ public boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData) throws LensException {
+ // Check the measure tags.
+ if (!evaluateMeasuresCompleteness(timeRange)) {
+ log
+ .info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", fact, incompleteMeasureData,
+ cubeql.getTimeRanges());
+ cubeql.addStoragePruningMsg(this, incompletePartitions(incompleteMeasureData));
+ if (failOnPartialData) {
+ return false;
+ }
+ }
+ PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
+ PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs();
+ Set<String> unsupportedTimeDims = Sets.newHashSet();
+ Set<String> partColsQueried = Sets.newHashSet();
+ partColsQueried.add(timeRange.getPartitionColumn());
+ StringBuilder extraWhereClauseFallback = new StringBuilder();
+ Set<FactPartition> rangeParts = getPartitions(timeRange, validUpdatePeriods, true, failOnPartialData, missingParts);
+ String partCol = timeRange.getPartitionColumn();
+ boolean partColNotSupported = rangeParts.isEmpty();
+ String storageTableName = getStorageName();
+ if (storagePruningMsgs.containsKey(storageTableName)) {
+ List<CandidateTablePruneCause> causes = storagePruningMsgs.get(storageTableName);
+ // Find the PART_COL_DOES_NOT_EXISTS
+ for (CandidateTablePruneCause cause : causes) {
+ if (cause.getCause().equals(CandidateTablePruneCode.PART_COL_DOES_NOT_EXIST)) {
+ partColNotSupported = cause.getNonExistantPartCols().contains(partCol);
+ }
+ }
+ }
+ TimeRange prevRange = timeRange;
+ String sep = "";
+ while (rangeParts.isEmpty()) {
+ String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
+ if (partColNotSupported && !getFact().getColumns().contains(timeDim)) {
+ unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn()));
+ break;
+ }
+ TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql);
+ log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange);
+ if (fallBackRange == null) {
+ break;
+ }
+ partColsQueried.add(fallBackRange.getPartitionColumn());
+ rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts);
+ extraWhereClauseFallback.append(sep)
+ .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
+ sep = " AND ";
+ prevRange = fallBackRange;
+ partCol = prevRange.getPartitionColumn();
+ if (!rangeParts.isEmpty()) {
+ break;
+ }
+ }
+ if (!unsupportedTimeDims.isEmpty()) {
+ log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", this.getFact(),
+ unsupportedTimeDims);
+ cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims));
+ return false;
+ }
+ Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
+ // TODO union : Relook at this.
+ nonExistingPartitions.addAll(nonExistingParts);
+ if (rangeParts.size() == 0 || (failOnPartialData && !nonExistingParts.isEmpty())) {
+ log.info("No partitions for fallback range:{}", timeRange);
+ return false;
+ }
+ String extraWhere = extraWhereClauseFallback.toString();
+ if (!StringUtils.isEmpty(extraWhere)) {
+ rangeToWhere.put(timeRange, "((" + rangeWriter
+ .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts)
+ + ") and (" + extraWhere + "))");
+ } else {
+ rangeToWhere.put(timeRange, rangeWriter
+ .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts));
+ }
+ // Add all the partitions. storagePartitions contains all the partitions for previous time ranges also.
+ this.storagePartitions.addAll(rangeParts);
+ return true;
+ }
+
+ private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException {
+ String factDataCompletenessTag = fact.getDataCompletenessTag();
+ if (factDataCompletenessTag == null) {
+ log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", fact);
+ return true;
+ }
+ Set<String> measureTag = new HashSet<>();
+ Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
+
+ processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
+
+ Set<String> measures = cubeql.getQueriedMsrs();
+ if (measures == null) {
+ measures = new HashSet<>();
+ }
+ for (String measure : measures) {
+ processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
+ }
+ //Checking if dataCompletenessTag is set for the fact
+ if (measureTag.isEmpty()) {
+ log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
+ return true;
+ }
+ boolean isDataComplete = false;
+ DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
+ DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ if (!timeRange.getPartitionColumn().equals(completenessPartCol)) {
+ log.info("Completeness check not available for partCol:{}", timeRange.getPartitionColumn());
+ return true;
+ }
+ Date from = timeRange.getFromDate();
+ Date to = timeRange.getToDate();
+ Map<String, Map<Date, Float>> completenessMap = completenessChecker
+ .getCompleteness(factDataCompletenessTag, from, to, measureTag);
+ if (completenessMap != null && !completenessMap.isEmpty()) {
+ for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
+ String tag = measureCompleteness.getKey();
+ for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
+ if (completenessResult.getValue() < completenessThreshold) {
+ log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
+ completenessResult.getValue(), completenessThreshold, formatter.format(completenessResult.getKey()));
+ String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
+ Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
+ if (incompletePartition == null) {
+ incompletePartition = new HashMap<>();
+ incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
+ }
+ incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
+ isDataComplete = true;
+ }
+ }
+ }
+ }
+ return isDataComplete;
+ }
+
+ private Set<FactPartition> getPartitions(TimeRange timeRange, TreeSet<UpdatePeriod> updatePeriods,
+ boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts)
+ throws LensException {
+ Set<FactPartition> partitions = new TreeSet<>();
+ if (timeRange != null && timeRange.isCoverableBy(updatePeriods) && getPartitions(timeRange.getFromDate(),
+ timeRange.getToDate(), timeRange.getPartitionColumn(), partitions, updatePeriods, addNonExistingParts,
+ failOnPartialData, missingParts)) {
+ return partitions;
+ }
+ return new TreeSet<>();
+ }
+
+ @Override
+ public Set<FactPartition> getParticipatingPartitions() {
+ return null;
+ }
+
+ @Override
+ public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+ return expr.isEvaluable(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (super.equals(obj)) {
+ return true;
+ }
+
+ if (obj == null || !(obj instanceof StorageCandidate)) {
+ return false;
+ }
+
+ StorageCandidate storageCandidateObj = (StorageCandidate) obj;
+ //Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly
+ //on == check for these.
+ return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageName
+ .equals(storageCandidateObj.storageName));
+ }
+
+ @Override
+ public int hashCode() {
+ return this.name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ public void addValidUpdatePeriod(UpdatePeriod updatePeriod) {
+ this.validUpdatePeriods.add(updatePeriod);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
index cdf6812..daab851 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,33 +18,28 @@
*/
package org.apache.lens.cube.parse;
+import static org.apache.lens.cube.metadata.MetastoreUtil.getFactOrDimtableStorageTableName;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIMEDIM_NOT_SUPPORTED;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.noCandidateStorages;
+import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange;
+
import java.text.DateFormat;
-import java.text.ParseException;
import java.text.SimpleDateFormat;
-
import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
-import static org.apache.lens.cube.metadata.MetastoreUtil.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*;
import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCause;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode;
import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.api.metastore.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.util.ReflectionUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -57,36 +52,23 @@ class StorageTableResolver implements ContextRewriter {
private final Configuration conf;
private final List<String> supportedStorages;
private final boolean allStoragesSupported;
- CubeMetastoreClient client;
private final boolean failOnPartialData;
private final List<String> validDimTables;
private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>();
- private String processTimePartCol = null;
private final UpdatePeriod maxInterval;
+ // TODO union : Remove this. All partitions are stored in the StorageCandidate.
private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>();
+ CubeMetastoreClient client;
+ Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
+ private String processTimePartCol = null;
private TimeRangeWriter rangeWriter;
private DateFormat partWhereClauseFormat = null;
private PHASE phase;
+ // TODO union : we do not need this. Remove the storage candidate
private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact;
private float completenessThreshold;
private String completenessPartCol;
- enum PHASE {
- FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
-
- static PHASE first() {
- return values()[0];
- }
-
- static PHASE last() {
- return values()[values().length - 1];
- }
-
- PHASE next() {
- return values()[(this.ordinal() + 1) % values().length];
- }
- }
-
public StorageTableResolver(Configuration conf) {
this.conf = conf;
this.supportedStorages = getSupportedStorages(conf);
@@ -101,16 +83,16 @@ class StorageTableResolver implements ContextRewriter {
} else {
this.maxInterval = null;
}
- rangeWriter =
- ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS,
- CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf);
+ rangeWriter = ReflectionUtils.newInstance(conf
+ .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER,
+ TimeRangeWriter.class), this.conf);
String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
if (formatStr != null) {
partWhereClauseFormat = new SimpleDateFormat(formatStr);
}
this.phase = PHASE.first();
- completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD,
- CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
+ completenessThreshold = conf
+ .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
}
@@ -122,36 +104,23 @@ class StorageTableResolver implements ContextRewriter {
return null;
}
- public boolean isStorageSupported(String storage) {
+ public boolean isStorageSupportedOnDriver(String storage) {
return allStoragesSupported || supportedStorages.contains(storage);
}
- Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
-
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
client = cubeql.getMetastoreClient();
switch (phase) {
- case FACT_TABLES:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage table names
- resolveFactStorageTableNames(cubeql);
+ case STORAGE_TABLES:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStorageTable(cubeql);
}
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
break;
- case FACT_PARTITIONS:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage partitions
- resolveFactStoragePartitions(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
- if (client != null && client.isDataCompletenessCheckEnabled()) {
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve incomplete fact partition
- resolveFactCompleteness(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION);
+ case STORAGE_PARTITIONS:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStoragePartitions(cubeql);
}
break;
case DIM_TABLE_AND_PARTITIONS:
@@ -162,13 +131,32 @@ class StorageTableResolver implements ContextRewriter {
cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables());
cubeql.getAutoJoinCtx().refreshJoinPathColumns();
}
+ // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate
+ cubeql.setNonexistingParts(nonExistingPartitions);
break;
}
- //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions.
- cubeql.setNonexistingParts(nonExistingPartitions);
phase = phase.next();
}
+ /**
+ * Each candidate in the set is a complex candidate. We will evaluate each one to get
+ * all the partitions needed to answer the query.
+ *
+ * @param cubeql
+ */
+ private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException {
+ Set<Candidate> candidateList = cubeql.getCandidates();
+ for (Candidate candidate : candidateList) {
+ boolean isComplete = true;
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ isComplete &= candidate.evaluateCompleteness(range, failOnPartialData);
+ }
+ if (!isComplete) {
+ // TODO union : Prune this candidate?
+ }
+ }
+ }
+
private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException {
Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions());
for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) {
@@ -184,8 +172,8 @@ class StorageTableResolver implements ContextRewriter {
CandidateDim candidate = i.next();
CubeDimensionTable dimtable = candidate.dimtable;
if (dimtable.getStorages().isEmpty()) {
- cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(
- CandidateTablePruneCode.MISSING_STORAGES));
+ cubeql
+ .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
i.remove();
continue;
}
@@ -194,7 +182,7 @@ class StorageTableResolver implements ContextRewriter {
boolean foundPart = false;
Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
for (String storage : dimtable.getStorages()) {
- if (isStorageSupported(storage)) {
+ if (isStorageSupportedOnDriver(storage)) {
String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase();
if (validDimTables != null && !validDimTables.contains(tableName)) {
log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName);
@@ -212,9 +200,8 @@ class StorageTableResolver implements ContextRewriter {
}
if (!failOnPartialData || foundPart) {
storageTables.add(tableName);
- String whereClause =
- StorageUtil.getWherePartClause(dim.getTimedDimension(), null,
- StorageConstants.getPartitionsForLatest());
+ String whereClause = StorageUtil
+ .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest());
whereClauses.put(tableName, whereClause);
} else {
log.info("Not considering dim storage table:{} as no dim partitions exist", tableName);
@@ -239,78 +226,115 @@ class StorageTableResolver implements ContextRewriter {
continue;
}
// pick the first storage table
- candidate.setStorageTable(storageTables.iterator().next());
- candidate.setWhereClause(whereClauses.get(candidate.getStorageTable()));
+ candidate.setStorageName(storageTables.iterator().next());
+ candidate.setWhereClause(whereClauses.get(candidate.getStorageName()));
}
}
}
- // Resolves all the storage table names, which are valid for each updatePeriod
- private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException {
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- skipStorageCausesPerFact = new HashMap<>();
- while (i.hasNext()) {
- CubeFactTable fact = i.next().fact;
- if (fact.getUpdatePeriods().isEmpty()) {
- cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
- i.remove();
+ /**
+ * Following storages are removed:
+ * 1. The storage is not supported by driver.
+ * 2. The storage is not in the valid storage list.
+ * 3. The storage is not in any time range in the query.
+ * 4. The storage having no valid update period.
+ *
+ * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}.
+ *
+ * TODO union : Do fourth point before 3.
+ */
+ private void resolveStorageTable(CubeQueryContext cubeql) throws LensException {
+ Iterator<Candidate> it = cubeql.getCandidates().iterator();
+ while (it.hasNext()) {
+ Candidate c = it.next();
+ assert (c instanceof StorageCandidate);
+ StorageCandidate sc = (StorageCandidate) c;
+ String storageTable = sc.getStorageName();
+ if (!isStorageSupportedOnDriver(storageTable)) {
+ log.info("Skipping storage: {} as it is not supported", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE));
+ it.remove();
+ continue;
+ }
+ String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName()));
+ List<String> validFactStorageTables = StringUtils.isBlank(str)
+ ? null
+ : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
+ // Check if storagetable is in the list of valid storages.
+ if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) {
+ log.info("Skipping storage table {} as it is not valid", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE));
+ it.remove();
continue;
}
- Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>();
- validStorageMap.put(fact, storageTableMap);
- String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName()));
- List<String> validFactStorageTables =
- StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
- Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
- for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) {
- String storage = entry.getKey();
- // skip storages that are not supported
- if (!isStorageSupported(storage)) {
- log.info("Skipping storage: {} as it is not supported", storage);
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
- continue;
+ boolean valid = false;
+ Set<CandidateTablePruneCause.CandidateTablePruneCode> codes = new HashSet<>();
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ boolean columnInRange = client
+ .isStorageTableCandidateForRange(storageTable, range.getFromDate(), range.getToDate());
+ boolean partitionColumnExists = client.partColExists(storageTable, range.getPartitionColumn());
+ valid = columnInRange && partitionColumnExists;
+ if (valid) {
+ break;
}
- String table = getStorageTableName(fact, storage, validFactStorageTables);
- // skip the update period if the storage is not valid
- if (table == null) {
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID));
+ if (!columnInRange) {
+ codes.add(TIME_RANGE_NOT_ANSWERABLE);
continue;
}
- List<String> validUpdatePeriods =
- CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage));
-
- boolean isStorageAdded = false;
- Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>();
- for (UpdatePeriod updatePeriod : entry.getValue()) {
- if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
- log.info("Skipping update period {} for fact {}", updatePeriod, fact);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
+ // This means fallback is required.
+ if (!partitionColumnExists) {
+ String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn());
+ if (!sc.getFact().getColumns().contains(timeDim)) {
+ // Not a time dimension so no fallback required.
+ codes.add(TIMEDIM_NOT_SUPPORTED);
continue;
}
- if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
- log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+ TimeRange fallBackRange = getFallbackRange(range, sc.getFact().getCubeName(), cubeql);
+ if (fallBackRange == null) {
+ log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
continue;
}
- Set<String> storageTables = storageTableMap.get(updatePeriod);
- if (storageTables == null) {
- storageTables = new LinkedHashSet<>();
- storageTableMap.put(updatePeriod, storageTables);
+ valid = client
+ .isStorageTableCandidateForRange(storageTable, fallBackRange.getFromDate(), fallBackRange.getToDate());
+ if (valid) {
+ break;
+ } else {
+ codes.add(TIME_RANGE_NOT_ANSWERABLE);
}
- isStorageAdded = true;
- log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod);
- storageTables.add(table);
}
- if (!isStorageAdded) {
- skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses));
+ }
+ if (!valid) {
+ it.remove();
+ for (CandidateTablePruneCode code : codes) {
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(code));
+ }
+ continue;
+ }
+
+ List<String> validUpdatePeriods = CubeQueryConfUtil
+ .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), storageTable));
+ boolean isStorageAdded = false;
+ Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>();
+
+ // Check for update period.
+ for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(storageTable)) {
+ if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
+ log.info("Skipping update period {} for fact {}", updatePeriod, sc.getFact());
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
+ continue;
+ }
+ if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
+ log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, sc.getFact(), storageTable);
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+ continue;
}
+ isStorageAdded = true;
+ sc.addValidUpdatePeriod(updatePeriod);
}
- skipStorageCausesPerFact.put(fact, skipStorageCauses);
- if (storageTableMap.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", fact);
- cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses));
- i.remove();
+ if (!isStorageAdded) {
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses));
+ it.remove();
}
}
}
@@ -321,7 +345,7 @@ class StorageTableResolver implements ContextRewriter {
return set;
}
- String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
+ private String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase();
if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) {
log.info("Skipping storage table {} as it is not valid", tableName);
@@ -330,507 +354,12 @@ class StorageTableResolver implements ContextRewriter {
return tableName;
}
- private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql)
- throws LensException {
- Cube baseCube = cubeql.getBaseCube();
- ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName());
- if (!cubeql.getCube().getName().equals(baseCube.getName())) {
- tableNames.add(baseCube.getName());
- }
- String fallBackString = null;
- String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
- for (String tableName : tableNames) {
- fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
- .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
- if (StringUtils.isNotBlank(fallBackString)) {
- break;
- }
- }
- if (StringUtils.isBlank(fallBackString)) {
- return null;
- }
- Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
- if (!matcher.matches()) {
- return null;
- }
- DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
- DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
- String relatedTimeDim = matcher.group(1).trim();
- String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
- return TimeRange.getBuilder()
- .fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
- .toDate(diff1.negativeOffsetFrom(range.getToDate()))
- .partitionColumn(fallbackPartCol).build();
- }
-
- private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException {
- // Find candidate tables wrt supported storages
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- while (i.hasNext()) {
- CandidateFact cfact = i.next();
- Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>();
- List<FactPartition> answeringParts = new ArrayList<>();
- Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact);
- if (skipStorageCauses == null) {
- skipStorageCauses = new HashMap<>();
- }
- PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
- boolean noPartsForRange = false;
- Set<String> unsupportedTimeDims = Sets.newHashSet();
- Set<String> partColsQueried = Sets.newHashSet();
- for (TimeRange range : cubeql.getTimeRanges()) {
- partColsQueried.add(range.getPartitionColumn());
- StringBuilder extraWhereClause = new StringBuilder();
- Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts);
- // If no partitions were found, then we'll fallback.
- String partCol = range.getPartitionColumn();
- boolean partColNotSupported = rangeParts.isEmpty();
- for (String storage : cfact.fact.getStorages()) {
- String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase();
- partColNotSupported &= skipStorageCauses.containsKey(storageTableName)
- && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST)
- && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol);
- }
- TimeRange prevRange = range;
- String sep = "";
- while (rangeParts.isEmpty()) {
- // TODO: should we add a condition whether on range's partcol any missing partitions are not there
- String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
- if (partColNotSupported && !cfact.getColumns().contains(timeDim)) {
- unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn()));
- break;
- }
- TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql);
- log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
- if (fallBackRange == null) {
- break;
- }
- partColsQueried.add(fallBackRange.getPartitionColumn());
- rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts);
- extraWhereClause.append(sep)
- .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
- sep = " AND ";
- prevRange = fallBackRange;
- partCol = prevRange.getPartitionColumn();
- if (!rangeParts.isEmpty()) {
- break;
- }
- }
- whereClauseForFallback.put(range, extraWhereClause.toString());
- if (rangeParts.isEmpty()) {
- log.info("No partitions for fallback range:{}", range);
- noPartsForRange = true;
- continue;
- }
- // If multiple storage tables are part of the same fact,
- // capture range->storage->partitions
- Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>();
- for (FactPartition factPart : rangeParts) {
- for (String table : factPart.getStorageTables()) {
- if (!tablePartMap.containsKey(table)) {
- tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart)));
- } else {
- LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table);
- storagePart.add(factPart);
- }
- }
- }
- cfact.getRangeToStoragePartMap().put(range, tablePartMap);
- cfact.incrementPartsQueried(rangeParts.size());
- answeringParts.addAll(rangeParts);
- cfact.getPartsQueried().addAll(rangeParts);
- }
- if (!unsupportedTimeDims.isEmpty()) {
- log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact,
- unsupportedTimeDims);
- cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims));
- i.remove();
- continue;
- }
- Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
- if (!nonExistingParts.isEmpty()) {
- addNonExistingParts(cfact.fact.getName(), nonExistingParts);
- }
- if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) {
- log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact,
- cubeql.getTimeRanges());
- /*
- * This fact is getting discarded because of any of following reasons:
- * 1. Has missing partitions
- * 2. All Storage tables were skipped for some reasons.
- * 3. Storage tables do not have the update period for the timerange queried.
- */
- if (failOnPartialData && !nonExistingParts.isEmpty()) {
- cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts));
- } else if (!skipStorageCauses.isEmpty()) {
- CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- } else {
- CandidateTablePruneCause cause =
- new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- }
- i.remove();
- continue;
- }
- // Map from storage to covering parts
- Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>();
- StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables);
- if (minimalStorageTables.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", cfact);
- cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses));
- i.remove();
- continue;
- }
- Set<String> storageTables = new LinkedHashSet<>();
- storageTables.addAll(minimalStorageTables.keySet());
- cfact.setStorageTables(storageTables);
- // Update range->storage->partitions with time range where clause
- for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) {
- Map<String, String> rangeToWhere = new HashMap<>();
- for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) {
- String table = entry.getKey();
- Set<FactPartition> minimalParts = entry.getValue();
-
- LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table);
- LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet();
-
- if (rangeParts != null) {
- minimalPartsCopy.addAll(minimalParts);
- minimalPartsCopy.retainAll(rangeParts);
- }
- if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) {
- rangeToWhere.put(table, "(("
- + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()),
- minimalPartsCopy) + ") and (" + whereClauseForFallback.get(trange) + "))");
- } else {
- rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql,
- cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy));
- }
- }
- cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere);
- }
- log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables);
- }
- }
-
- private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
- Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
- if (column != null && column.getTags() != null) {
- String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
- //Checking if dataCompletenessTag is set for queried measure
- if (dataCompletenessTag != null) {
- measureTag.add(dataCompletenessTag);
- String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
- if (value == null) {
- tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
- } else {
- value = value.concat(",").concat(alias);
- tagToMeasureOrExprMap.put(dataCompletenessTag, value);
- }
- return true;
- }
- }
- return false;
- }
-
- private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- boolean isExprProcessed;
- String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
- for (String expr : cubeql.getQueriedExprsWithMeasures()) {
- isExprProcessed = false;
- for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
- .getAllExprs()) {
- if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
- for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
- if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
- /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
- So, even if the expression is composed of measures with different dataCompletenessTags, we will be
- determining the dataCompleteness from one of the measure and this expression is grouped with the
- other queried measures that have the same dataCompletenessTag. */
- isExprProcessed = true;
- break;
- }
- }
- }
- if (isExprProcessed) {
- break;
- }
- }
- }
- }
-
- private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException {
- if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) {
- return;
- }
- DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
- Set<String> measureTag = new HashSet<>();
- Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
-
- processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
-
- Set<String> measures = cubeql.getQueriedMsrs();
- if (measures == null) {
- measures = new HashSet<>();
- }
- for (String measure : measures) {
- processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
- }
- //Checking if dataCompletenessTag is set for the fact
- if (measureTag.isEmpty()) {
- log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
- return;
- }
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
- while (i.hasNext()) {
- CandidateFact cFact = i.next();
- // Map from measure to the map from partition to %completeness
- Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
-
- String factDataCompletenessTag = cFact.fact.getDataCompletenessTag();
- if (factDataCompletenessTag == null) {
- log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact);
- continue;
- }
- boolean isFactDataIncomplete = false;
- for (TimeRange range : cubeql.getTimeRanges()) {
- if (!range.getPartitionColumn().equals(completenessPartCol)) {
- log.info("Completeness check not available for partCol:{}", range.getPartitionColumn());
- continue;
- }
- Date from = range.getFromDate();
- Date to = range.getToDate();
- Map<String, Map<Date, Float>> completenessMap = completenessChecker.getCompleteness(factDataCompletenessTag,
- from, to, measureTag);
- if (completenessMap != null && !completenessMap.isEmpty()) {
- for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
- String tag = measureCompleteness.getKey();
- for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
- if (completenessResult.getValue() < completenessThreshold) {
- log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
- completenessResult.getValue(), completenessThreshold,
- formatter.format(completenessResult.getKey()));
- String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
- Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
- if (incompletePartition == null) {
- incompletePartition = new HashMap<>();
- incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
- }
- incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
- isFactDataIncomplete = true;
- }
- }
- }
- }
- }
- if (isFactDataIncomplete) {
- log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact,
- incompleteMeasureData, cubeql.getTimeRanges());
- if (failOnPartialData) {
- i.remove();
- cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData));
- } else {
- cFact.setDataCompletenessMap(incompleteMeasureData);
- }
- }
- }
- }
-
void addNonExistingParts(String name, Set<String> nonExistingParts) {
nonExistingPartitions.put(name, nonExistingParts);
}
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range,
- Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions) throws LensException {
- try {
- return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses,
- missingPartitions);
- } catch (Exception e) {
- throw new LensException(e);
- }
- }
-
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- Set<FactPartition> partitions = new TreeSet<>();
- if (range != null && range.isCoverableBy(updatePeriods)
- && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) {
- return partitions;
- } else {
- return new TreeSet<>();
- }
- }
-
- private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol,
- Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate);
- if (fromDate.equals(toDate) || fromDate.after(toDate)) {
- return true;
- }
- UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
- if (interval == null) {
- log.info("No max interval for range: {} to {}", fromDate, toDate);
- return false;
- }
- log.debug("Max interval for {} is: {}", fact, interval);
- Set<String> storageTbls = new LinkedHashSet<String>();
- storageTbls.addAll(validStorageMap.get(fact).get(interval));
-
- if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
- for (String storageTbl : storageTbls) {
- FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- log.info("Added continuous fact partition for storage table {}", storageTbl);
- }
- return true;
- }
-
- Iterator<String> it = storageTbls.iterator();
- while (it.hasNext()) {
- String storageTableName = it.next();
- if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) {
- skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE));
- it.remove();
- } else if (!client.partColExists(storageTableName, partCol)) {
- log.info("{} does not exist in {}", partCol, storageTableName);
- skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol));
- it.remove();
- }
- }
-
- if (storageTbls.isEmpty()) {
- return false;
- }
- Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
- Date floorToDate = DateUtil.getFloorDate(toDate, interval);
-
- int lookAheadNumParts =
- conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
-
- TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
- // add partitions from ceilFrom to floorTo
- while (iter.hasNext()) {
- Date dt = iter.next();
- Date nextDt = iter.peekNext();
- FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
- log.debug("candidate storage tables for searching partitions: {}", storageTbls);
- updateFactPartitionStorageTablesFrom(fact, part, storageTbls);
- log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
- if (part.isFound()) {
- log.debug("Adding existing partition {}", part);
- partitions.add(part);
- log.debug("Looking for look ahead process time partitions for {}", part);
- if (processTimePartCol == null) {
- log.debug("processTimePartCol is null");
- } else if (partCol.equals(processTimePartCol)) {
- log.debug("part column is process time col");
- } else if (updatePeriods.first().equals(interval)) {
- log.debug("Update period is the least update period");
- } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
- // see if this is the part of the last-n look ahead partitions
- log.debug("Not a look ahead partition");
- } else {
- log.debug("Looking for look ahead process time partitions for {}", part);
- // check if finer partitions are required
- // final partitions are required if no partitions from
- // look-ahead
- // process time are present
- TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts,
- interval, 1).iterator();
- while (processTimeIter.hasNext()) {
- Date pdt = processTimeIter.next();
- Date nextPdt = processTimeIter.peekNext();
- FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, processTimePartition,
- part.getStorageTables());
- if (processTimePartition.isFound()) {
- log.debug("Finer parts not required for look-ahead partition :{}", part);
- } else {
- log.debug("Looked ahead process time partition {} is not found", processTimePartition);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- log.debug("newset of update periods:{}", newset);
- if (!newset.isEmpty()) {
- // Get partitions for look ahead process time
- log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
- Set<FactPartition> processTimeParts =
- getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(
- processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions);
- log.debug("Look ahead partitions: {}", processTimeParts);
- TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
- for (FactPartition pPart : processTimeParts) {
- log.debug("Looking for finer partitions in pPart: {}", pPart);
- for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
- FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, innerPart, pPart);
- if (innerPart.isFound()) {
- partitions.add(innerPart);
- }
- }
- log.debug("added all sub partitions blindly in pPart: {}", pPart);
- }
- }
- }
- }
- }
- } else {
- log.info("Partition:{} does not exist in any storage table", part);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses,
- missingPartitions)) {
-
- log.debug("Adding non existing partition {}", part);
- if (addNonExistingParts) {
- // Add non existing partitions for all cases of whether we populate all non existing or not.
- missingPartitions.add(part);
- if (!failOnPartialData) {
- Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls);
- if (st.isEmpty()) {
- log.info("No eligible storage tables");
- return false;
- }
- partitions.add(part);
- part.getStorageTables().addAll(st);
- }
- } else {
- log.info("No finer granual partitions exist for {}", part);
- return false;
- }
- } else {
- log.debug("Finer granual partitions added for {}", part);
- }
- }
- }
- return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)
- && getPartitions(fact, floorToDate, toDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions);
- }
-
- private Set<String> getStorageTablesWithoutPartCheck(FactPartition part,
- Set<String> storageTableNames) throws LensException, HiveException {
+ private Set<String> getStorageTablesWithoutPartCheck(FactPartition part, Set<String> storageTableNames)
+ throws LensException, HiveException {
Set<String> validStorageTbls = new HashSet<>();
for (String storageTableName : storageTableNames) {
// skip all storage tables for which are not eligible for this partition
@@ -843,21 +372,19 @@ class StorageTableResolver implements ContextRewriter {
return validStorageTbls;
}
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException {
- for (String storageTableName : storageTableNames) {
- // skip all storage tables for which are not eligible for this partition
- if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())
- && (client.factPartitionExists(fact, part, storageTableName))) {
- part.getStorageTables().add(storageTableName);
- part.setFound(true);
- }
+ enum PHASE {
+ STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
+
+ static PHASE first() {
+ return values()[0];
+ }
+
+ static PHASE last() {
+ return values()[values().length - 1];
}
- }
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException {
- updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables());
- part.setFound(part.isFound() && pPart.isFound());
+ PHASE next() {
+ return values()[(this.ordinal() + 1) % values().length];
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
index f9636d1..4f5d405 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,13 +18,19 @@
*/
package org.apache.lens.cube.parse;
+import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
+
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.lens.cube.metadata.FactPartition;
-import org.apache.lens.cube.metadata.StorageConstants;
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Lists;
+
public final class StorageUtil {
private StorageUtil() {
@@ -69,8 +75,8 @@ public final class StorageUtil {
String sep = "";
for (String timePartCol : timedDimensions) {
if (!timePartCol.equals(partCol)) {
- sb.append(sep).append(alias).append(".").append(timePartCol)
- .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
+ sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '")
+ .append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
sep = " AND ";
}
}
@@ -82,15 +88,11 @@ public final class StorageUtil {
String sep = "((";
for (String clause : clauses) {
if (clause != null && !clause.isEmpty()) {
- sb
- .append(sep)
- .append(clause);
+ sb.append(sep).append(clause);
sep = ") AND (";
}
}
- return sb
- .append(sep.equals("((") ? "" : "))")
- .toString();
+ return sb.append(sep.equals("((") ? "" : "))").toString();
}
/**
@@ -161,4 +163,108 @@ public final class StorageUtil {
return null;
}
}
+
+ /**
+ * Get fallback range
+ * @param range
+ * @param factName
+ * @param cubeql
+ * @return
+ * @throws LensException
+ */
+ public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql)
+ throws LensException {
+ Cube baseCube = cubeql.getBaseCube();
+ ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName());
+ if (!cubeql.getCube().getName().equals(baseCube.getName())) {
+ tableNames.add(baseCube.getName());
+ }
+ String fallBackString = null;
+ String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
+ for (String tableName : tableNames) {
+ fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
+ .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
+ if (StringUtils.isNotBlank(fallBackString)) {
+ break;
+ }
+ }
+ if (StringUtils.isBlank(fallBackString)) {
+ return null;
+ }
+ Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
+ if (!matcher.matches()) {
+ return null;
+ }
+ DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
+ DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
+ String relatedTimeDim = matcher.group(1).trim();
+ String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
+ return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
+ .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build();
+ }
+
+ /**
+ * Checks how much data is completed for a column.
+ * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker}
+ * @param cubeql
+ * @param cubeCol
+ * @param alias
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ * @return
+ */
+ public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
+ Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) {
+ CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
+ if (column != null && column.getTags() != null) {
+ String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
+ //Checking if dataCompletenessTag is set for queried measure
+ if (dataCompletenessTag != null) {
+ measureTag.add(dataCompletenessTag);
+ String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
+ if (value == null) {
+ tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+ } else {
+ value = value.concat(",").concat(alias);
+ tagToMeasureOrExprMap.put(dataCompletenessTag, value);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Extract the expression for the measure.
+ * @param cubeql
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ */
+ public static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
+ Map<String, String> tagToMeasureOrExprMap) {
+ boolean isExprProcessed;
+ String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
+ for (String expr : cubeql.getQueriedExprsWithMeasures()) {
+ isExprProcessed = false;
+ for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
+ .getAllExprs()) {
+ if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
+ for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
+ if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
+ /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
+ So, even if the expression is composed of measures with different dataCompletenessTags, we will be
+ determining the dataCompleteness from one of the measure and this expression is grouped with the
+ other queried measures that have the same dataCompletenessTag. */
+ isExprProcessed = true;
+ break;
+ }
+ }
+ }
+ if (isExprProcessed) {
+ break;
+ }
+ }
+ }
+ }
}
+