You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2017/04/12 13:03:57 UTC
[14/20] lens git commit: LENS-1381: Support Fact to Fact Union
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
index 2b63193..10c3bbe 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,35 +18,21 @@
*/
package org.apache.lens.cube.parse;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.incompletePartitions;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.partitionColumnsMissing;
import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
-import static org.apache.lens.cube.metadata.MetastoreUtil.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*;
import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode;
import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.api.metastore.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import lombok.extern.slf4j.Slf4j;
-
/**
* Resolve storages and partitions of all candidate tables and prunes candidate tables with missing storages or
* partitions.
@@ -57,35 +43,13 @@ class StorageTableResolver implements ContextRewriter {
private final Configuration conf;
private final List<String> supportedStorages;
private final boolean allStoragesSupported;
- CubeMetastoreClient client;
private final boolean failOnPartialData;
private final List<String> validDimTables;
- private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>();
- private String processTimePartCol = null;
private final UpdatePeriod maxInterval;
+ // TODO union : Remove this. All partitions are stored in the StorageCandidate.
private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>();
- private TimeRangeWriter rangeWriter;
- private DateFormat partWhereClauseFormat = null;
+ private CubeMetastoreClient client;
private PHASE phase;
- private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact;
- private float completenessThreshold;
- private String completenessPartCol;
-
- enum PHASE {
- FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
-
- static PHASE first() {
- return values()[0];
- }
-
- static PHASE last() {
- return values()[values().length - 1];
- }
-
- PHASE next() {
- return values()[(this.ordinal() + 1) % values().length];
- }
- }
StorageTableResolver(Configuration conf) {
this.conf = conf;
@@ -94,24 +58,13 @@ class StorageTableResolver implements ContextRewriter {
this.failOnPartialData = conf.getBoolean(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false);
String str = conf.get(CubeQueryConfUtil.VALID_STORAGE_DIM_TABLES);
validDimTables = StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
- this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL);
String maxIntervalStr = conf.get(CubeQueryConfUtil.QUERY_MAX_INTERVAL);
if (maxIntervalStr != null) {
- this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr);
+ this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr.toUpperCase());
} else {
this.maxInterval = null;
}
- rangeWriter =
- ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS,
- CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf);
- String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
- if (formatStr != null) {
- partWhereClauseFormat = new SimpleDateFormat(formatStr);
- }
this.phase = PHASE.first();
- completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD,
- CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
- completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
}
private List<String> getSupportedStorages(Configuration conf) {
@@ -122,55 +75,88 @@ class StorageTableResolver implements ContextRewriter {
return null;
}
- public boolean isStorageSupported(String storage) {
+ private boolean isStorageSupportedOnDriver(String storage) {
return allStoragesSupported || supportedStorages.contains(storage);
}
- Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
-
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
client = cubeql.getMetastoreClient();
switch (phase) {
- case FACT_TABLES:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage table names
- resolveFactStorageTableNames(cubeql);
+ case STORAGE_TABLES:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStorageTable(cubeql);
}
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
break;
- case FACT_PARTITIONS:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage partitions
- resolveFactStoragePartitions(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
- if (client != null && client.isDataCompletenessCheckEnabled()) {
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve incomplete fact partition
- resolveFactCompleteness(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION);
+ case STORAGE_PARTITIONS:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStoragePartitions(cubeql);
}
break;
case DIM_TABLE_AND_PARTITIONS:
resolveDimStorageTablesAndPartitions(cubeql);
if (cubeql.getAutoJoinCtx() != null) {
// After all candidates are pruned after storage resolver, prune join paths.
- cubeql.getAutoJoinCtx().pruneAllPaths(cubeql.getCube(), cubeql.getCandidateFacts(), null);
+ cubeql.getAutoJoinCtx()
+ .pruneAllPaths(cubeql.getCube(), CandidateUtil.getStorageCandidates(cubeql.getCandidates()), null);
cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables());
cubeql.getAutoJoinCtx().refreshJoinPathColumns();
}
+ // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate
+ cubeql.setNonexistingParts(nonExistingPartitions);
break;
}
- //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions.
- cubeql.setNonexistingParts(nonExistingPartitions);
phase = phase.next();
}
+ /**
+ * Each candidate in the set is a complex candidate. We will evaluate each one to get
+ * all the partitions needed to answer the query.
+ *
+ * @param cubeql cube query context
+ */
+ private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException {
+ Iterator<Candidate> candidateIterator = cubeql.getCandidates().iterator();
+ while (candidateIterator.hasNext()) {
+ Candidate candidate = candidateIterator.next();
+ boolean isComplete = true;
+ boolean isTimeRangeAnswerableByThisCandidate = true;
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ if (!candidate.isTimeRangeCoverable(range)) {
+ isTimeRangeAnswerableByThisCandidate = false;
+ log.info("Not considering candidate:{} as it can not cover time range {}", candidate, range);
+ cubeql.addCandidatePruningMsg(candidate,
+ CandidateTablePruneCause.storageNotAvailableInRange(Lists.newArrayList(range)));
+ break;
+ }
+ isComplete &= candidate.evaluateCompleteness(range, range, failOnPartialData);
+ }
+ if (!isTimeRangeAnswerableByThisCandidate) {
+ candidateIterator.remove();
+ } else if (failOnPartialData && !isComplete) {
+ candidateIterator.remove();
+ log.info("Not considering candidate:{} as its data is not is not complete", candidate);
+ Set<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate);
+ for (StorageCandidate sc : scSet) {
+ if (!sc.getNonExistingPartitions().isEmpty()) {
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.missingPartitions(sc.getNonExistingPartitions()));
+ } else if (!sc.getDataCompletenessMap().isEmpty()) {
+ cubeql.addStoragePruningMsg(sc, incompletePartitions(sc.getDataCompletenessMap()));
+ }
+ }
+ } else if (candidate.getParticipatingPartitions().isEmpty()
+ && candidate instanceof StorageCandidate
+ && ((StorageCandidate) candidate).getNonExistingPartitions().isEmpty()) {
+ candidateIterator.remove();
+ cubeql.addCandidatePruningMsg(candidate,
+ new CandidateTablePruneCause(CandidateTablePruneCode.NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE));
+ }
+ }
+ }
+
private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException {
- Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions());
+ Set<Dimension> allDims = new HashSet<>(cubeql.getDimensions());
for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) {
allDims.add(dim.getObject());
}
@@ -184,21 +170,23 @@ class StorageTableResolver implements ContextRewriter {
CandidateDim candidate = i.next();
CubeDimensionTable dimtable = candidate.dimtable;
if (dimtable.getStorages().isEmpty()) {
- cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(
- CandidateTablePruneCode.MISSING_STORAGES));
+ cubeql
+ .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
i.remove();
continue;
}
- Set<String> storageTables = new HashSet<String>();
+ Set<String> storageTables = new HashSet<>();
Map<String, String> whereClauses = new HashMap<String, String>();
boolean foundPart = false;
- Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
+ // TODO union : We have to remove all usages of a deprecated class.
+ Map<String, CandidateTablePruneCode> skipStorageCauses = new HashMap<>();
for (String storage : dimtable.getStorages()) {
- if (isStorageSupported(storage)) {
- String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase();
+ if (isStorageSupportedOnDriver(storage)) {
+ String tableName = MetastoreUtil.getFactOrDimtableStorageTableName(dimtable.getName(),
+ storage).toLowerCase();
if (validDimTables != null && !validDimTables.contains(tableName)) {
log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName);
- skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.INVALID));
+ skipStorageCauses.put(tableName, CandidateTablePruneCode.INVALID);
continue;
}
@@ -212,13 +200,12 @@ class StorageTableResolver implements ContextRewriter {
}
if (!failOnPartialData || foundPart) {
storageTables.add(tableName);
- String whereClause =
- StorageUtil.getWherePartClause(dim.getTimedDimension(), null,
- StorageConstants.getPartitionsForLatest());
+ String whereClause = StorageUtil
+ .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest());
whereClauses.put(tableName, whereClause);
} else {
log.info("Not considering dim storage table:{} as no dim partitions exist", tableName);
- skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.NO_PARTITIONS));
+ skipStorageCauses.put(tableName, CandidateTablePruneCode.NO_PARTITIONS);
}
} else {
storageTables.add(tableName);
@@ -226,7 +213,7 @@ class StorageTableResolver implements ContextRewriter {
}
} else {
log.info("Storage:{} is not supported", storage);
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
+ skipStorageCauses.put(storage, CandidateTablePruneCode.UNSUPPORTED_STORAGE);
}
}
if (!foundPart) {
@@ -234,7 +221,8 @@ class StorageTableResolver implements ContextRewriter {
}
if (storageTables.isEmpty()) {
log.info("Not considering dim table:{} as no candidate storage tables eixst", dimtable);
- cubeql.addDimPruningMsgs(dim, dimtable, noCandidateStorages(skipStorageCauses));
+ cubeql.addDimPruningMsgs(dim, dimtable,
+ CandidateTablePruneCause.noCandidateStoragesForDimtable(skipStorageCauses));
i.remove();
continue;
}
@@ -245,619 +233,151 @@ class StorageTableResolver implements ContextRewriter {
}
}
- // Resolves all the storage table names, which are valid for each updatePeriod
- private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException {
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- skipStorageCausesPerFact = new HashMap<>();
- while (i.hasNext()) {
- CubeFactTable fact = i.next().fact;
- if (fact.getUpdatePeriods().isEmpty()) {
- cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
- i.remove();
+ /**
+ * Following storages are removed:
+ * 1. The storage is not supported by driver.
+ * 2. The storage is not in the valid storage list.
+ * 3. The storage is not in any time range in the query.
+ * 4. The storage having no valid update period.
+ *
+ * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}.
+ *
+ * TODO union : Do fourth point before 3.
+ */
+ private void resolveStorageTable(CubeQueryContext cubeql) throws LensException {
+ Iterator<Candidate> it = cubeql.getCandidates().iterator();
+ while (it.hasNext()) {
+ Candidate c = it.next();
+ assert (c instanceof StorageCandidate);
+ StorageCandidate sc = (StorageCandidate) c;
+ String storageTable = sc.getStorageName();
+ // first check: if the storage is supported on driver
+ if (!isStorageSupportedOnDriver(storageTable)) {
+ log.info("Skipping storage: {} as it is not supported", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE));
+ it.remove();
continue;
}
- Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>();
- validStorageMap.put(fact, storageTableMap);
- String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName()));
+ String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName()));
List<String> validFactStorageTables =
StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
- Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
-
- for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) {
- String storage = entry.getKey();
- // skip storages that are not supported
- if (!isStorageSupported(storage)) {
- log.info("Skipping storage: {} as it is not supported", storage);
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
- continue;
- }
- String table = getStorageTableName(fact, storage, validFactStorageTables);
- // skip the update period if the storage is not valid
- if (table == null) {
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID));
- continue;
- }
- List<String> validUpdatePeriods =
- CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage));
-
- boolean isStorageAdded = false;
- Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>();
- for (UpdatePeriod updatePeriod : entry.getValue()) {
- if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
- log.info("Skipping update period {} for fact {}", updatePeriod, fact);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
- continue;
- }
- if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
- log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
- continue;
- }
- Set<String> storageTables = storageTableMap.get(updatePeriod);
- if (storageTables == null) {
- storageTables = new LinkedHashSet<>();
- storageTableMap.put(updatePeriod, storageTables);
- }
- isStorageAdded = true;
- log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod);
- storageTables.add(table);
- }
- if (!isStorageAdded) {
- skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses));
- }
- }
- skipStorageCausesPerFact.put(fact, skipStorageCauses);
- if (storageTableMap.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", fact);
- cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses));
- i.remove();
- }
- }
- }
-
- private TreeSet<UpdatePeriod> getValidUpdatePeriods(CubeFactTable fact) {
- TreeSet<UpdatePeriod> set = new TreeSet<UpdatePeriod>();
- set.addAll(validStorageMap.get(fact).keySet());
- return set;
- }
-
- String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
- String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase();
- if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) {
- log.info("Skipping storage table {} as it is not valid", tableName);
- return null;
- }
- return tableName;
- }
-
- private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql)
- throws LensException {
- Cube baseCube = cubeql.getBaseCube();
- ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName());
- if (!cubeql.getCube().getName().equals(baseCube.getName())) {
- tableNames.add(baseCube.getName());
- }
- String fallBackString = null;
- String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
- for (String tableName : tableNames) {
- fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
- .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
- if (StringUtils.isNotBlank(fallBackString)) {
- break;
- }
- }
- if (StringUtils.isBlank(fallBackString)) {
- return null;
- }
- Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
- if (!matcher.matches()) {
- return null;
- }
- DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
- DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
- String relatedTimeDim = matcher.group(1).trim();
- String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
- return TimeRange.getBuilder()
- .fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
- .toDate(diff1.negativeOffsetFrom(range.getToDate()))
- .partitionColumn(fallbackPartCol).build();
- }
-
- private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException {
- // Find candidate tables wrt supported storages
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- while (i.hasNext()) {
- CandidateFact cfact = i.next();
- Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>();
- List<FactPartition> answeringParts = new ArrayList<>();
- Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact);
- if (skipStorageCauses == null) {
- skipStorageCauses = new HashMap<>();
- }
- PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
- boolean noPartsForRange = false;
- Set<String> unsupportedTimeDims = Sets.newHashSet();
- Set<String> partColsQueried = Sets.newHashSet();
- for (TimeRange range : cubeql.getTimeRanges()) {
- partColsQueried.add(range.getPartitionColumn());
- StringBuilder extraWhereClause = new StringBuilder();
- Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts);
- // If no partitions were found, then we'll fallback.
- String partCol = range.getPartitionColumn();
- boolean partColNotSupported = rangeParts.isEmpty();
- for (String storage : cfact.fact.getStorages()) {
- String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase();
- partColNotSupported &= skipStorageCauses.containsKey(storageTableName)
- && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST)
- && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol);
- }
- TimeRange prevRange = range;
- String sep = "";
- while (rangeParts.isEmpty()) {
- // TODO: should we add a condition whether on range's partcol any missing partitions are not there
- String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
- if (partColNotSupported && !cfact.getColumns().contains(timeDim)) {
- unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn()));
- break;
- }
- TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql);
- log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
- if (fallBackRange == null) {
- break;
- }
- partColsQueried.add(fallBackRange.getPartitionColumn());
- rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts);
- extraWhereClause.append(sep)
- .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
- sep = " AND ";
- prevRange = fallBackRange;
- partCol = prevRange.getPartitionColumn();
- if (!rangeParts.isEmpty()) {
- break;
- }
- }
- whereClauseForFallback.put(range, extraWhereClause.toString());
- if (rangeParts.isEmpty()) {
- log.info("No partitions for fallback range:{}", range);
- noPartsForRange = true;
- continue;
- }
- // If multiple storage tables are part of the same fact,
- // capture range->storage->partitions
- Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>();
- for (FactPartition factPart : rangeParts) {
- for (String table : factPart.getStorageTables()) {
- if (!tablePartMap.containsKey(table)) {
- tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart)));
- } else {
- LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table);
- storagePart.add(factPart);
- }
- }
- }
- cfact.getRangeToStoragePartMap().put(range, tablePartMap);
- cfact.incrementPartsQueried(rangeParts.size());
- answeringParts.addAll(rangeParts);
- cfact.getPartsQueried().addAll(rangeParts);
- }
- if (!unsupportedTimeDims.isEmpty()) {
- log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact,
- unsupportedTimeDims);
- cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims));
- i.remove();
- continue;
- }
- Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
- if (!nonExistingParts.isEmpty()) {
- addNonExistingParts(cfact.fact.getName(), nonExistingParts);
- }
- if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) {
- log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact,
- cubeql.getTimeRanges());
- /*
- * This fact is getting discarded because of any of following reasons:
- * 1. Has missing partitions
- * 2. All Storage tables were skipped for some reasons.
- * 3. Storage tables do not have the update period for the timerange queried.
- */
- if (failOnPartialData && !nonExistingParts.isEmpty()) {
- cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts));
- } else if (!skipStorageCauses.isEmpty()) {
- CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- } else {
- CandidateTablePruneCause cause =
- new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- }
- i.remove();
+ storageTable = sc.getStorageTable();
+ // Check if storagetable is in the list of valid storages.
+ if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) {
+ log.info("Skipping storage table {} as it is not valid", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE));
+ it.remove();
continue;
}
- // Map from storage to covering parts
- Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>();
- StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables);
- if (minimalStorageTables.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", cfact);
- cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses));
- i.remove();
+ List<String> validUpdatePeriods = CubeQueryConfUtil
+ .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName()));
+ boolean isUpdatePeriodForStorageAdded = false;
+ Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>();
+
+ if (cubeql.getTimeRanges().stream().noneMatch(range -> CandidateUtil.isPartiallyValidForTimeRange(sc, range))) {
+ cubeql.addStoragePruningMsg(sc,
+ new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+ it.remove();
continue;
}
- Set<String> storageTables = new LinkedHashSet<>();
- storageTables.addAll(minimalStorageTables.keySet());
- cfact.setStorageTables(storageTables);
- // Update range->storage->partitions with time range where clause
- for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) {
- Map<String, String> rangeToWhere = new HashMap<>();
- for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) {
- String table = entry.getKey();
- Set<FactPartition> minimalParts = entry.getValue();
-
- LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table);
- LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet();
-
- if (rangeParts != null) {
- minimalPartsCopy.addAll(minimalParts);
- minimalPartsCopy.retainAll(rangeParts);
- }
- if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) {
- rangeToWhere.put(table, "(("
- + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()),
- minimalPartsCopy) + ") and (" + whereClauseForFallback.get(trange) + "))");
- } else {
- rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql,
- cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy));
- }
- }
- cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere);
- }
- log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables);
- }
- }
- private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
- Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
- if (column != null && column.getTags() != null) {
- String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
- //Checking if dataCompletenessTag is set for queried measure
- if (dataCompletenessTag != null) {
- measureTag.add(dataCompletenessTag);
- String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
- if (value == null) {
- tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+ // Populate valid update periods abd check validity at update period level
+ for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) {
+ if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
+ // if user supplied max interval, all intervals larger than that are useless.
+ log.info("Skipping update period {} for candidate {} since it's more than max interval supplied({})",
+ updatePeriod, sc.getStorageTable(), maxInterval);
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.UPDATE_PERIOD_BIGGER_THAN_MAX);
+ } else if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
+ // if user supplied valid update periods, other update periods are useless
+ log.info("Skipping update period {} for candidate {} for storage {} since it's invalid",
+ updatePeriod, sc.getStorageTable(), storageTable);
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+ } else if (!sc.isUpdatePeriodUseful(updatePeriod)) {
+ // if the storage candidate finds this update useful to keep looking at the time ranges queried
+ skipUpdatePeriodCauses.put(updatePeriod.toString(),
+ SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD);
} else {
- value = value.concat(",").concat(alias);
- tagToMeasureOrExprMap.put(dataCompletenessTag, value);
- }
- return true;
- }
- }
- return false;
- }
-
- private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- boolean isExprProcessed;
- String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
- for (String expr : cubeql.getQueriedExprsWithMeasures()) {
- isExprProcessed = false;
- for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
- .getAllExprs()) {
- if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
- for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
- if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
- /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
- So, even if the expression is composed of measures with different dataCompletenessTags, we will be
- determining the dataCompleteness from one of the measure and this expression is grouped with the
- other queried measures that have the same dataCompletenessTag. */
- isExprProcessed = true;
- break;
- }
- }
- }
- if (isExprProcessed) {
- break;
+ isUpdatePeriodForStorageAdded = true;
+ sc.addValidUpdatePeriod(updatePeriod);
}
}
- }
- }
-
- private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException {
- if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) {
- return;
- }
- DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
- Set<String> measureTag = new HashSet<>();
- Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
-
- processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
-
- Set<String> measures = cubeql.getQueriedMsrs();
- if (measures == null) {
- measures = new HashSet<>();
- }
- for (String measure : measures) {
- processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
- }
- //Checking if dataCompletenessTag is set for the fact
- if (measureTag.isEmpty()) {
- log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
- return;
- }
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
- while (i.hasNext()) {
- CandidateFact cFact = i.next();
- // Map from measure to the map from partition to %completeness
- Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
-
- String factDataCompletenessTag = cFact.fact.getDataCompletenessTag();
- if (factDataCompletenessTag == null) {
- log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact);
- continue;
+ // For DEBUG purpose only to see why some update periods are skipped.
+ if (!skipUpdatePeriodCauses.isEmpty()) {
+ sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses);
}
- boolean isFactDataIncomplete = false;
- for (TimeRange range : cubeql.getTimeRanges()) {
- if (!range.getPartitionColumn().equals(completenessPartCol)) {
- log.info("Completeness check not available for partCol:{}", range.getPartitionColumn());
- continue;
+ // if no update periods were added in previous section, we skip this storage candidate
+ if (!isUpdatePeriodForStorageAdded) {
+ if (skipUpdatePeriodCauses.values().stream().allMatch(
+ SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) {
+ // all update periods bigger than query range, it means time range not answerable.
+ cubeql.addStoragePruningMsg(sc,
+ new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+ } else { // Update periods are rejected for multiple reasons.
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses));
}
- Date from = range.getFromDate();
- Date to = range.getToDate();
- Map<String, Map<Date, Float>> completenessMap = completenessChecker.getCompleteness(factDataCompletenessTag,
- from, to, measureTag);
- if (completenessMap != null && !completenessMap.isEmpty()) {
- for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
- String tag = measureCompleteness.getKey();
- for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
- if (completenessResult.getValue() < completenessThreshold) {
- log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
- completenessResult.getValue(), completenessThreshold,
- formatter.format(completenessResult.getKey()));
- String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
- Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
- if (incompletePartition == null) {
- incompletePartition = new HashMap<>();
- incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
+ it.remove();
+ } else {
+ //set the dates again as they can change based on ValidUpdatePeriod
+ sc.setStorageStartAndEndDate();
+ Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size());
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ CandidateTablePruneCause pruningCauseForThisTimeRange = null;
+ if (!CandidateUtil.isPartiallyValidForTimeRange(sc, range)) {
+ //This is the prune cause
+ pruningCauseForThisTimeRange =
+ new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE);
+ } else if (cubeql.shouldReplaceTimeDimWithPart()) {
+ if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) {
+ pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn());
+ TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql);
+ while (fallBackRange != null) {
+ pruningCauseForThisTimeRange = null;
+ if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(),
+ fallBackRange.getPartitionColumn())) {
+ pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn());
+ fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql);
+ } else {
+ if (!CandidateUtil.isPartiallyValidForTimeRange(sc, fallBackRange)) {
+ pruningCauseForThisTimeRange =
+ new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE);
+ }
+ break;
}
- incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
- isFactDataIncomplete = true;
}
}
}
+
+ if (pruningCauseForThisTimeRange != null) {
+ allPruningCauses.add(pruningCauseForThisTimeRange);
+ }
}
- }
- if (isFactDataIncomplete) {
- log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact,
- incompleteMeasureData, cubeql.getTimeRanges());
- if (failOnPartialData) {
- i.remove();
- cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData));
- } else {
- cFact.setDataCompletenessMap(incompleteMeasureData);
+ if (!allPruningCauses.isEmpty()) {
+ // TODO if this storage can answer atleast one time range , why prune it ?
+ it.remove();
+ cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0]));
}
}
}
}
- void addNonExistingParts(String name, Set<String> nonExistingParts) {
+ private void addNonExistingParts(String name, Set<String> nonExistingParts) {
nonExistingPartitions.put(name, nonExistingParts);
}
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range,
- Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions) throws LensException {
- try {
- return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses,
- missingPartitions);
- } catch (Exception e) {
- throw new LensException(e);
- }
- }
-
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- Set<FactPartition> partitions = new TreeSet<>();
- if (range != null && range.isCoverableBy(updatePeriods)
- && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) {
- return partitions;
- } else {
- return new TreeSet<>();
- }
- }
-
- private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol,
- Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate);
- if (fromDate.equals(toDate) || fromDate.after(toDate)) {
- return true;
- }
- UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
- if (interval == null) {
- log.info("No max interval for range: {} to {}", fromDate, toDate);
- return false;
- }
- log.debug("Max interval for {} is: {}", fact, interval);
- Set<String> storageTbls = new LinkedHashSet<String>();
- storageTbls.addAll(validStorageMap.get(fact).get(interval));
-
- if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
- for (String storageTbl : storageTbls) {
- FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- log.info("Added continuous fact partition for storage table {}", storageTbl);
- }
- return true;
- }
-
- Iterator<String> it = storageTbls.iterator();
- while (it.hasNext()) {
- String storageTableName = it.next();
- if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) {
- skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE));
- it.remove();
- } else if (!client.partColExists(storageTableName, partCol)) {
- log.info("{} does not exist in {}", partCol, storageTableName);
- skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol));
- it.remove();
- }
- }
-
- if (storageTbls.isEmpty()) {
- return false;
- }
- Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
- Date floorToDate = DateUtil.getFloorDate(toDate, interval);
-
- int lookAheadNumParts =
- conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
-
- TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
- // add partitions from ceilFrom to floorTo
- while (iter.hasNext()) {
- Date dt = iter.next();
- Date nextDt = iter.peekNext();
- FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
- log.debug("candidate storage tables for searching partitions: {}", storageTbls);
- updateFactPartitionStorageTablesFrom(fact, part, storageTbls);
- log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
- if (part.isFound()) {
- log.debug("Adding existing partition {}", part);
- partitions.add(part);
- log.debug("Looking for look ahead process time partitions for {}", part);
- if (processTimePartCol == null) {
- log.debug("processTimePartCol is null");
- } else if (partCol.equals(processTimePartCol)) {
- log.debug("part column is process time col");
- } else if (updatePeriods.first().equals(interval)) {
- log.debug("Update period is the least update period");
- } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
- // see if this is the part of the last-n look ahead partitions
- log.debug("Not a look ahead partition");
- } else {
- log.debug("Looking for look ahead process time partitions for {}", part);
- // check if finer partitions are required
- // final partitions are required if no partitions from
- // look-ahead
- // process time are present
- TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts,
- interval, 1).iterator();
- while (processTimeIter.hasNext()) {
- Date pdt = processTimeIter.next();
- Date nextPdt = processTimeIter.peekNext();
- FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, processTimePartition,
- part.getStorageTables());
- if (processTimePartition.isFound()) {
- log.debug("Finer parts not required for look-ahead partition :{}", part);
- } else {
- log.debug("Looked ahead process time partition {} is not found", processTimePartition);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- log.debug("newset of update periods:{}", newset);
- if (!newset.isEmpty()) {
- // Get partitions for look ahead process time
- log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
- Set<FactPartition> processTimeParts =
- getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(
- processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions);
- log.debug("Look ahead partitions: {}", processTimeParts);
- TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
- for (FactPartition pPart : processTimeParts) {
- log.debug("Looking for finer partitions in pPart: {}", pPart);
- for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
- FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, innerPart, pPart);
- if (innerPart.isFound()) {
- partitions.add(innerPart);
- }
- }
- log.debug("added all sub partitions blindly in pPart: {}", pPart);
- }
- }
- }
- }
- }
- } else {
- log.info("Partition:{} does not exist in any storage table", part);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses,
- missingPartitions)) {
+ enum PHASE {
+ STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
- log.debug("Adding non existing partition {}", part);
- if (addNonExistingParts) {
- // Add non existing partitions for all cases of whether we populate all non existing or not.
- missingPartitions.add(part);
- if (!failOnPartialData) {
- Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls);
- if (st.isEmpty()) {
- log.info("No eligible storage tables");
- return false;
- }
- partitions.add(part);
- part.getStorageTables().addAll(st);
- }
- } else {
- log.info("No finer granual partitions exist for {}", part);
- return false;
- }
- } else {
- log.debug("Finer granual partitions added for {}", part);
- }
- }
+ static PHASE first() {
+ return values()[0];
}
- return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)
- && getPartitions(fact, floorToDate, toDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions);
- }
- private Set<String> getStorageTablesWithoutPartCheck(FactPartition part,
- Set<String> storageTableNames) throws LensException, HiveException {
- Set<String> validStorageTbls = new HashSet<>();
- for (String storageTableName : storageTableNames) {
- // skip all storage tables for which are not eligible for this partition
- if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())) {
- validStorageTbls.add(storageTableName);
- } else {
- log.info("Skipping {} as it is not valid for part {}", storageTableName, part.getPartSpec());
- }
+ static PHASE last() {
+ return values()[values().length - 1];
}
- return validStorageTbls;
- }
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException {
- for (String storageTableName : storageTableNames) {
- // skip all storage tables for which are not eligible for this partition
- if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())
- && (client.factPartitionExists(fact, part, storageTableName))) {
- part.getStorageTables().add(storageTableName);
- part.setFound(true);
- }
+ PHASE next() {
+ return values()[(this.ordinal() + 1) % values().length];
}
}
-
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException {
- updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables());
- part.setFound(part.isFound() && pPart.isFound());
- }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
index f9636d1..f5cd540 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,13 +18,19 @@
*/
package org.apache.lens.cube.parse;
+import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
+
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.lens.cube.metadata.FactPartition;
-import org.apache.lens.cube.metadata.StorageConstants;
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Lists;
+
public final class StorageUtil {
private StorageUtil() {
@@ -69,8 +75,8 @@ public final class StorageUtil {
String sep = "";
for (String timePartCol : timedDimensions) {
if (!timePartCol.equals(partCol)) {
- sb.append(sep).append(alias).append(".").append(timePartCol)
- .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
+ sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '")
+ .append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
sep = " AND ";
}
}
@@ -82,15 +88,11 @@ public final class StorageUtil {
String sep = "((";
for (String clause : clauses) {
if (clause != null && !clause.isEmpty()) {
- sb
- .append(sep)
- .append(clause);
+ sb.append(sep).append(clause);
sep = ") AND (";
}
}
- return sb
- .append(sep.equals("((") ? "" : "))")
- .toString();
+ return sb.append(sep.equals("((") ? "" : "))").toString();
}
/**
@@ -161,4 +163,112 @@ public final class StorageUtil {
return null;
}
}
+
+ /**
+ * Get fallback range
+ *
+ * @param range
+ * @param factName
+ * @param cubeql
+ * @return
+ * @throws LensException
+ */
+ public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql)
+ throws LensException {
+ Cube baseCube = cubeql.getBaseCube();
+ ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName());
+ if (!cubeql.getCube().getName().equals(baseCube.getName())) {
+ tableNames.add(baseCube.getName());
+ }
+ String fallBackString = null;
+ String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
+ for (String tableName : tableNames) {
+ fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
+ .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
+ if (StringUtils.isNotBlank(fallBackString)) {
+ break;
+ }
+ }
+ if (StringUtils.isBlank(fallBackString)) {
+ return null;
+ }
+ Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
+ if (!matcher.matches()) {
+ return null;
+ }
+ DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
+ DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
+ String relatedTimeDim = matcher.group(1).trim();
+ String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
+ return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
+ .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build();
+ }
+
+ /**
+ * Checks how much data is completed for a column.
+ * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker}
+ *
+ * @param cubeql
+ * @param cubeCol
+ * @param alias
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ * @return
+ */
+ public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
+ Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) {
+ CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
+ if (column != null && column.getTags() != null) {
+ String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
+ //Checking if dataCompletenessTag is set for queried measure
+ if (dataCompletenessTag != null) {
+ measureTag.add(dataCompletenessTag);
+ String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
+ if (value == null) {
+ tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+ } else {
+ value = value.concat(",").concat(alias);
+ tagToMeasureOrExprMap.put(dataCompletenessTag, value);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This method extracts all the columns used in expressions (used in query) and evaluates each
+ * column separately for completeness
+ *
+ * @param cubeql
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ */
+ public static void processExpressionsForCompleteness(CubeQueryContext cubeql, Set<String> measureTag,
+ Map<String, String> tagToMeasureOrExprMap) {
+ boolean isExprProcessed;
+ String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
+ for (String expr : cubeql.getQueriedExprsWithMeasures()) {
+ isExprProcessed = false;
+ for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
+ .getAllExprs()) {
+ if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
+ for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
+ if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
+ /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
+ So, even if the expression is composed of measures with different dataCompletenessTags, we will be
+ determining the dataCompleteness from one of the measure and this expression is grouped with the
+ other queried measures that have the same dataCompletenessTag. */
+ isExprProcessed = true;
+ break;
+ }
+ }
+ }
+ if (isExprProcessed) {
+ break;
+ }
+ }
+ }
+ }
}
+
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
deleted file mode 100644
index f18ae36..0000000
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.cube.parse;
-
-import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
-
-import java.util.*;
-
-import org.apache.lens.cube.error.ColUnAvailableInTimeRange;
-import org.apache.lens.cube.error.ColUnAvailableInTimeRangeException;
-import org.apache.lens.cube.error.LensCubeErrorCode;
-import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.metadata.join.JoinPath;
-import org.apache.lens.cube.parse.join.AutoJoinContext;
-import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.error.LensException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class TimeRangeChecker implements ContextRewriter {
- public TimeRangeChecker(Configuration conf) {
- }
- @Override
- public void rewriteContext(CubeQueryContext cubeql) throws LensException {
- if (cubeql.getCube() == null) {
- return;
- }
- doColLifeValidation(cubeql);
- doFactRangeValidation(cubeql);
- }
- private void extractTimeRange(CubeQueryContext cubeql) throws LensException {
- // get time range -
- // Time range should be direct child of where condition
- // TOK_WHERE.TOK_FUNCTION.Identifier Or, it should be right hand child of
- // AND condition TOK_WHERE.KW_AND.TOK_FUNCTION.Identifier
- if (cubeql.getWhereAST() == null || cubeql.getWhereAST().getChildCount() < 1) {
- throw new LensException(LensCubeErrorCode.NO_TIMERANGE_FILTER.getLensErrorInfo());
- }
- searchTimeRanges(cubeql.getWhereAST(), cubeql, null, 0);
- }
-
- private void searchTimeRanges(ASTNode root, CubeQueryContext cubeql, ASTNode parent, int childIndex)
- throws LensException {
- if (root == null) {
- return;
- } else if (root.getToken().getType() == TOK_FUNCTION) {
- ASTNode fname = HQLParser.findNodeByPath(root, Identifier);
- if (fname != null && CubeQueryContext.TIME_RANGE_FUNC.equalsIgnoreCase(fname.getText())) {
- processTimeRangeFunction(cubeql, root, parent, childIndex);
- }
- } else {
- for (int i = 0; i < root.getChildCount(); i++) {
- ASTNode child = (ASTNode) root.getChild(i);
- searchTimeRanges(child, cubeql, root, i);
- }
- }
- }
-
- private String getColumnName(ASTNode node) {
- String column = null;
- if (node.getToken().getType() == DOT) {
- ASTNode colIdent = (ASTNode) node.getChild(1);
- column = colIdent.getText().toLowerCase();
- } else if (node.getToken().getType() == TOK_TABLE_OR_COL) {
- // Take child ident.totext
- ASTNode ident = (ASTNode) node.getChild(0);
- column = ident.getText().toLowerCase();
- }
- return column;
- }
-
- private void processTimeRangeFunction(CubeQueryContext cubeql, ASTNode timenode, ASTNode parent, int childIndex)
- throws LensException {
- TimeRange.TimeRangeBuilder builder = TimeRange.getBuilder();
- builder.astNode(timenode);
- builder.parent(parent);
- builder.childIndex(childIndex);
-
- String timeDimName = getColumnName((ASTNode) timenode.getChild(1));
-
- if (!cubeql.getCube().getTimedDimensions().contains(timeDimName)) {
- throw new LensException(LensCubeErrorCode.NOT_A_TIMED_DIMENSION.getLensErrorInfo(), timeDimName);
- }
- // Replace timeDimName with column which is used for partitioning. Assume
- // the same column
- // is used as a partition column in all storages of the fact
- timeDimName = cubeql.getPartitionColumnOfTimeDim(timeDimName);
- builder.partitionColumn(timeDimName);
-
- String fromDateRaw = PlanUtils.stripQuotes(timenode.getChild(2).getText());
- String toDateRaw = null;
- if (timenode.getChildCount() > 3) {
- ASTNode toDateNode = (ASTNode) timenode.getChild(3);
- if (toDateNode != null) {
- toDateRaw = PlanUtils.stripQuotes(timenode.getChild(3).getText());
- }
- }
- long currentTime = cubeql.getConf().getLong(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, 0);
- Date now;
- if (currentTime != 0) {
- now = new Date(currentTime);
- } else {
- now = new Date();
- }
- builder.fromDate(DateUtil.resolveDate(fromDateRaw, now));
- if (StringUtils.isNotBlank(toDateRaw)) {
- builder.toDate(DateUtil.resolveDate(toDateRaw, now));
- } else {
- builder.toDate(now);
- }
-
- TimeRange range = builder.build();
- range.validate();
- cubeql.getTimeRanges().add(range);
- }
-
- private void doColLifeValidation(CubeQueryContext cubeql) throws LensException,
- ColUnAvailableInTimeRangeException {
- Set<String> cubeColumns = cubeql.getColumnsQueriedForTable(cubeql.getCube().getName());
- if (cubeColumns == null || cubeColumns.isEmpty()) {
- // Query doesn't have any columns from cube
- return;
- }
-
- for (String col : cubeql.getColumnsQueriedForTable(cubeql.getCube().getName())) {
- CubeColumn column = cubeql.getCube().getColumnByName(col);
- for (TimeRange range : cubeql.getTimeRanges()) {
- if (column == null) {
- if (!cubeql.getCube().getTimedDimensions().contains(col)) {
- throw new LensException(LensCubeErrorCode.NOT_A_CUBE_COLUMN.getLensErrorInfo(), col);
- }
- continue;
- }
- if (!column.isColumnAvailableInTimeRange(range)) {
- throwException(column);
- }
- }
- }
-
- // Remove join paths that have columns with invalid life span
- AutoJoinContext joinContext = cubeql.getAutoJoinCtx();
- if (joinContext == null) {
- return;
- }
- // Get cube columns which are part of join chain
- Set<String> joinColumns = joinContext.getAllJoinPathColumnsOfTable((AbstractCubeTable) cubeql.getCube());
- if (joinColumns == null || joinColumns.isEmpty()) {
- return;
- }
-
- // Loop over all cube columns part of join paths
- for (String col : joinColumns) {
- CubeColumn column = cubeql.getCube().getColumnByName(col);
- for (TimeRange range : cubeql.getTimeRanges()) {
- if (!column.isColumnAvailableInTimeRange(range)) {
- log.info("Timerange queried is not in column life for {}, Removing join paths containing the column", column);
- // Remove join paths containing this column
- Map<Aliased<Dimension>, List<JoinPath>> allPaths = joinContext.getAllPaths();
-
- for (Aliased<Dimension> dimension : allPaths.keySet()) {
- List<JoinPath> joinPaths = allPaths.get(dimension);
- Iterator<JoinPath> joinPathIterator = joinPaths.iterator();
-
- while (joinPathIterator.hasNext()) {
- JoinPath path = joinPathIterator.next();
- if (path.containsColumnOfTable(col, (AbstractCubeTable) cubeql.getCube())) {
- log.info("Removing join path: {} as columns :{} is not available in the range", path, col);
- joinPathIterator.remove();
- if (joinPaths.isEmpty()) {
- // This dimension doesn't have any paths left
- throw new LensException(LensCubeErrorCode.NO_JOIN_PATH.getLensErrorInfo(),
- "No valid join path available for dimension " + dimension + " which would satisfy time range "
- + range.getFromDate() + "-" + range.getToDate());
- }
- }
- } // End loop to remove path
-
- } // End loop for all paths
- }
- } // End time range loop
- } // End column loop
- }
-
-
- private void throwException(CubeColumn column) throws ColUnAvailableInTimeRangeException {
-
- final Long availabilityStartTime = (column.getStartTimeMillisSinceEpoch().isPresent())
- ? column.getStartTimeMillisSinceEpoch().get() : null;
-
- final Long availabilityEndTime = column.getEndTimeMillisSinceEpoch().isPresent()
- ? column.getEndTimeMillisSinceEpoch().get() : null;
-
- ColUnAvailableInTimeRange col = new ColUnAvailableInTimeRange(column.getName(), availabilityStartTime,
- availabilityEndTime);
-
- throw new ColUnAvailableInTimeRangeException(col);
- }
-
- private void doFactRangeValidation(CubeQueryContext cubeql) {
- Iterator<CandidateFact> iter = cubeql.getCandidateFacts().iterator();
- while (iter.hasNext()) {
- CandidateFact cfact = iter.next();
- List<TimeRange> invalidTimeRanges = Lists.newArrayList();
- for (TimeRange timeRange : cubeql.getTimeRanges()) {
- if (!cfact.isValidForTimeRange(timeRange)) {
- invalidTimeRanges.add(timeRange);
- }
- }
- if (!invalidTimeRanges.isEmpty()){
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.factNotAvailableInRange(invalidTimeRanges));
- log.info("Not considering {} as it's not available for time ranges: {}", cfact, invalidTimeRanges);
- iter.remove();
- }
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE);
- }
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
new file mode 100644
index 0000000..7f07dbc
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.cube.parse;
+
+import java.util.*;
+
+import org.apache.lens.cube.metadata.FactPartition;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Represents a union of two candidates
+ */
+public class UnionCandidate implements Candidate {
+
+ /**
+ * Caching start and end time calculated for this candidate as it may have many child candidates.
+ */
+ Date startTime = null;
+ Date endTime = null;
+ String toStr;
+ CubeQueryContext cubeql;
+ /**
+ * List of child candidates that will be union-ed
+ */
+ private List<Candidate> childCandidates;
+ private QueryAST queryAst;
+ private Map<TimeRange, Map<Candidate, TimeRange>> splitTimeRangeMap = Maps.newHashMap();
+ public UnionCandidate(List<Candidate> childCandidates, CubeQueryContext cubeql) {
+ this.childCandidates = childCandidates;
+ //this.alias = alias;
+ this.cubeql = cubeql;
+ }
+
+ @Override
+ public Set<Integer> getAnswerableMeasurePhraseIndices() {
+ // All children in the UnionCandiate will be having common quriable measure
+ return getChildren().iterator().next().getAnswerableMeasurePhraseIndices();
+ }
+
+ @Override
+ public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException {
+ Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange);
+ for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) {
+ if (!entry.getKey().isTimeRangeCoverable(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Collection<String> getColumns() {
+ // In UnionCandidate all columns are same, return the columns
+ // of first child
+ return childCandidates.iterator().next().getColumns();
+ }
+
+ @Override
+ public Date getStartTime() {
+ //Note: concurrent calls not handled specifically (This should not be a problem even if we do
+ //get concurrent calls).
+
+ if (startTime == null) {
+ Date minStartTime = childCandidates.get(0).getStartTime();
+ for (Candidate child : childCandidates) {
+ if (child.getStartTime().before(minStartTime)) {
+ minStartTime = child.getStartTime();
+ }
+ }
+ startTime = minStartTime;
+ }
+ return startTime;
+ }
+
+ @Override
+ public Date getEndTime() {
+ if (endTime == null) {
+ Date maxEndTime = childCandidates.get(0).getEndTime();
+ for (Candidate child : childCandidates) {
+ if (child.getEndTime().after(maxEndTime)) {
+ maxEndTime = child.getEndTime();
+ }
+ }
+ endTime = maxEndTime;
+ }
+ return endTime;
+ }
+
+ @Override
+ public double getCost() {
+ double cost = 0.0;
+ for (TimeRange timeRange : cubeql.getTimeRanges()) {
+ for (Map.Entry<Candidate, TimeRange> entry : getTimeRangeSplit(timeRange).entrySet()) {
+ cost += entry.getKey().getCost() * entry.getValue().milliseconds() / timeRange.milliseconds();
+ }
+ }
+ return cost;
+ }
+
+ @Override
+ public boolean contains(Candidate candidate) {
+ if (this.equals(candidate)) {
+ return true;
+ }
+ for (Candidate child : childCandidates) {
+ if (child.contains((candidate))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Collection<Candidate> getChildren() {
+ return childCandidates;
+ }
+
+ /**
+ * @param timeRange
+ * @return
+ */
+ @Override
+ public boolean evaluateCompleteness(TimeRange timeRange, TimeRange parentTimeRange, boolean failOnPartialData)
+ throws LensException {
+ Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange);
+ boolean ret = true;
+ for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) {
+ ret &= entry.getKey().evaluateCompleteness(entry.getValue(), parentTimeRange, failOnPartialData);
+ }
+ return ret;
+ }
+
+ @Override
+ public Set<FactPartition> getParticipatingPartitions() {
+ Set<FactPartition> factPartitionSet = new HashSet<>();
+ for (Candidate c : childCandidates) {
+ factPartitionSet.addAll(c.getParticipatingPartitions());
+ }
+ return factPartitionSet;
+ }
+
+ @Override
+ public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+ for (Candidate cand : childCandidates) {
+ if (!cand.isExpressionEvaluable(expr)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (this.toStr == null) {
+ this.toStr = getToString();
+ }
+ return this.toStr;
+ }
+
+ private String getToString() {
+ StringBuilder builder = new StringBuilder(10 * childCandidates.size());
+ builder.append("UNION[");
+ for (Candidate candidate : childCandidates) {
+ builder.append(candidate.toString());
+ builder.append(", ");
+ }
+ builder.delete(builder.length() - 2, builder.length());
+ builder.append("]");
+ return builder.toString();
+ }
+
+ /**
+ * Splits the parent time range for each candidate.
+ * The candidates are sorted based on their costs.
+ *
+ * @param timeRange
+ * @return
+ */
+ private Map<Candidate, TimeRange> splitTimeRangeForChildren(TimeRange timeRange) {
+ childCandidates.sort(Comparator.comparing(Candidate::getCost));
+ Map<Candidate, TimeRange> childrenTimeRangeMap = new HashMap<>();
+ // Sorted list based on the weights.
+ Set<TimeRange> ranges = new HashSet<>();
+ ranges.add(timeRange);
+ for (Candidate c : childCandidates) {
+ TimeRange.TimeRangeBuilder builder = getClonedBuiler(timeRange);
+ TimeRange tr = resolveTimeRangeForChildren(c, ranges, builder);
+ if (tr != null) {
+ // If the time range is not null it means this child candidate is valid for this union candidate.
+ childrenTimeRangeMap.put(c, tr);
+ }
+ }
+ return childrenTimeRangeMap;
+ }
+ private Map<Candidate, TimeRange> getTimeRangeSplit(TimeRange range) {
+ return splitTimeRangeMap.computeIfAbsent(range, this::splitTimeRangeForChildren);
+ }
+
+ /**
+ * Resolves the time range for this candidate based on overlap.
+ *
+ * @param candidate : Candidate for which the time range is to be calculated
+ * @param ranges : Set of time ranges from which one has to be choosen.
+ * @param builder : TimeRange builder created by the common AST.
+ * @return Calculated timeRange for the candidate. If it returns null then there is no suitable time range split for
+ * this candidate. This is the correct behaviour because an union candidate can have non participating child
+ * candidates for the parent time range.
+ */
+ private TimeRange resolveTimeRangeForChildren(Candidate candidate, Set<TimeRange> ranges,
+ TimeRange.TimeRangeBuilder builder) {
+ Iterator<TimeRange> it = ranges.iterator();
+ Set<TimeRange> newTimeRanges = new HashSet<>();
+ TimeRange ret = null;
+ while (it.hasNext()) {
+ TimeRange range = it.next();
+ // Check for out of range
+ if (candidate.getStartTime().getTime() >= range.getToDate().getTime() || candidate.getEndTime().getTime() <= range
+ .getFromDate().getTime()) {
+ continue;
+ }
+ // This means overlap.
+ if (candidate.getStartTime().getTime() <= range.getFromDate().getTime()) {
+ // Start time of the new time range will be range.getFromDate()
+ builder.fromDate(range.getFromDate());
+ if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) {
+ // End time is in the middle of the range is equal to c.getEndTime().
+ builder.toDate(candidate.getEndTime());
+ } else {
+ // End time will be range.getToDate()
+ builder.toDate(range.getToDate());
+ }
+ } else {
+ builder.fromDate(candidate.getStartTime());
+ if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) {
+ builder.toDate(candidate.getEndTime());
+ } else {
+ builder.toDate(range.getToDate());
+ }
+ }
+ // Remove the time range and add more time ranges.
+ it.remove();
+ ret = builder.build();
+ if (ret.getFromDate().getTime() == range.getFromDate().getTime()) {
+ checkAndUpdateNewTimeRanges(ret, range, newTimeRanges);
+ } else {
+ TimeRange.TimeRangeBuilder b1 = getClonedBuiler(ret);
+ b1.fromDate(range.getFromDate());
+ b1.toDate(ret.getFromDate());
+ newTimeRanges.add(b1.build());
+ checkAndUpdateNewTimeRanges(ret, range, newTimeRanges);
+
+ }
+ break;
+ }
+ ranges.addAll(newTimeRanges);
+ return ret;
+ }
+
+ private void checkAndUpdateNewTimeRanges(TimeRange ret, TimeRange range, Set<TimeRange> newTimeRanges) {
+ if (ret.getToDate().getTime() < range.getToDate().getTime()) {
+ TimeRange.TimeRangeBuilder b2 = getClonedBuiler(ret);
+ b2.fromDate(ret.getToDate());
+ b2.toDate(range.getToDate());
+ newTimeRanges.add(b2.build());
+ }
+ }
+
+ private TimeRange.TimeRangeBuilder getClonedBuiler(TimeRange timeRange) {
+ TimeRange.TimeRangeBuilder builder = new TimeRange.TimeRangeBuilder();
+ builder.astNode(timeRange.getAstNode());
+ builder.childIndex(timeRange.getChildIndex());
+ builder.parent(timeRange.getParent());
+ builder.partitionColumn(timeRange.getPartitionColumn());
+ return builder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
deleted file mode 100644
index e6ee989..0000000
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.lens.cube.parse;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.lens.server.api.error.LensException;
-
-import org.apache.commons.lang.NotImplementedException;
-
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
-
-@AllArgsConstructor
-@RequiredArgsConstructor
-public abstract class UnionHQLContext extends SimpleHQLContext {
- protected final CubeQueryContext query;
- protected final CandidateFact fact;
-
- List<HQLContextInterface> hqlContexts = new ArrayList<>();
-
- public void setHqlContexts(List<HQLContextInterface> hqlContexts) throws LensException {
- this.hqlContexts = hqlContexts;
- StringBuilder queryParts = new StringBuilder("(");
- String sep = "";
- for (HQLContextInterface ctx : hqlContexts) {
- queryParts.append(sep).append(ctx.toHQL());
- sep = " UNION ALL ";
- }
- setFrom(queryParts.append(") ").append(query.getCube().getName()).toString());
- }
-
- @Override
- public String getWhere() {
- throw new NotImplementedException("Not Implemented");
- }
-}