You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pu...@apache.org on 2016/12/21 12:06:09 UTC
[1/3] lens git commit: Initial chnages for Union
Repository: lens
Updated Branches:
refs/heads/lens-1381 [created] b6f0cc3d4
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
index 89b50f5..fe867c7 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TimeRangeChecker implements ContextRewriter {
public TimeRangeChecker(Configuration conf) {
+
}
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
@@ -49,7 +50,6 @@ public class TimeRangeChecker implements ContextRewriter {
return;
}
doColLifeValidation(cubeql);
- doFactRangeValidation(cubeql);
}
private void extractTimeRange(CubeQueryContext cubeql) throws LensException {
// get time range -
@@ -137,6 +137,7 @@ public class TimeRangeChecker implements ContextRewriter {
cubeql.getTimeRanges().add(range);
}
+ //TODO union: This can be executed before finding CoveringSets but after denormresolver and joinresolver
private void doColLifeValidation(CubeQueryContext cubeql) throws LensException,
ColUnAvailableInTimeRangeException {
Set<String> cubeColumns = cubeql.getColumnsQueriedForTable(cubeql.getCube().getName());
@@ -222,7 +223,6 @@ public class TimeRangeChecker implements ContextRewriter {
} // End column loop
}
-
private void throwException(CubeColumn column) throws ColUnAvailableInTimeRangeException {
final Long availabilityStartTime = (column.getStartTimeMillisSinceEpoch().isPresent())
@@ -236,23 +236,4 @@ public class TimeRangeChecker implements ContextRewriter {
throw new ColUnAvailableInTimeRangeException(col);
}
-
- private void doFactRangeValidation(CubeQueryContext cubeql) {
- Iterator<CandidateFact> iter = cubeql.getCandidateFacts().iterator();
- while (iter.hasNext()) {
- CandidateFact cfact = iter.next();
- List<TimeRange> invalidTimeRanges = Lists.newArrayList();
- for (TimeRange timeRange : cubeql.getTimeRanges()) {
- if (!cfact.isValidForTimeRange(timeRange)) {
- invalidTimeRanges.add(timeRange);
- }
- }
- if (!invalidTimeRanges.isEmpty()){
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.factNotAvailableInRange(invalidTimeRanges));
- log.info("Not considering {} as it's not available for time ranges: {}", cfact, invalidTimeRanges);
- iter.remove();
- }
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE);
- }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
new file mode 100644
index 0000000..ce28b7e
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
@@ -0,0 +1,247 @@
+package org.apache.lens.cube.parse;
+
+import java.util.*;
+
+import org.apache.lens.cube.metadata.FactPartition;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.Getter;
+
+/**
+ * Represents a union of two candidates
+ */
+public class UnionCandidate implements Candidate {
+
+ /**
+ * Caching start and end time calculated for this candidate as it may have many child candidates.
+ */
+ Date startTime = null;
+ Date endTime = null;
+ String toStr;
+ @Getter
+ String alias;
+ /**
+ * List of child candidates that will be union-ed
+ */
+ private List<Candidate> childCandidates;
+
+ public UnionCandidate(List<Candidate> childCandidates, String alias) {
+ this.childCandidates = childCandidates;
+ this.alias = alias;
+ }
+
+ @Override
+ public String toHQL() {
+ return null;
+ }
+
+ @Override
+ public QueryAST getQueryAst() {
+ return null;
+ }
+
+ @Override
+ public Collection<String> getColumns() {
+ return null;
+ }
+
+ @Override
+ public Date getStartTime() {
+ //Note: concurrent calls not handled specifically (This should not be a problem even if we do
+ //get concurrent calls).
+
+ if (startTime == null) {
+ Date minStartTime = childCandidates.get(0).getStartTime();
+ for (Candidate child : childCandidates) {
+ if (child.getStartTime().before(minStartTime)) {
+ minStartTime = child.getStartTime();
+ }
+ }
+ startTime = minStartTime;
+ }
+ return startTime;
+ }
+
+ @Override
+ public Date getEndTime() {
+ if (endTime == null) {
+ Date maxEndTime = childCandidates.get(0).getEndTime();
+ for (Candidate child : childCandidates) {
+ if (child.getEndTime().after(maxEndTime)) {
+ maxEndTime = child.getEndTime();
+ }
+ }
+ endTime = maxEndTime;
+ }
+ return endTime;
+ }
+
+ @Override
+ public double getCost() {
+ double cost = 0.0;
+ for (Candidate cand : childCandidates) {
+ cost += cand.getCost();
+ }
+ return cost;
+ }
+
+ @Override
+ public boolean contains(Candidate candidate) {
+ if (this.equals(candidate)) {
+ return true;
+ }
+
+ for (Candidate child : childCandidates) {
+ if (child.contains((candidate)))
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Collection<Candidate> getChildren() {
+ return childCandidates;
+ }
+
+ /**
+ * @param timeRange
+ * @return
+ */
+ @Override
+ public boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData) throws LensException {
+ Map<Candidate, TimeRange> candidateRange = getTimeRangeForChildren(timeRange);
+ boolean ret = true;
+ for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) {
+ ret &= entry.getKey().evaluateCompleteness(entry.getValue(), failOnPartialData);
+ }
+ return ret;
+ }
+
+ @Override
+ public Set<FactPartition> getParticipatingPartitions() {
+ return null;
+ }
+
+ @Override
+ public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+ for (Candidate cand : childCandidates) {
+ if (!cand.isExpressionEvaluable(expr)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (this.toStr == null) {
+ this.toStr = getToString();
+ }
+ return this.toStr;
+ }
+
+ private String getToString() {
+ StringBuilder builder = new StringBuilder(10 * childCandidates.size());
+ builder.append("UNION[");
+ for (Candidate candidate : childCandidates) {
+ builder.append(candidate.toString());
+ builder.append(", ");
+ }
+ builder.delete(builder.length() - 2, builder.length());
+ builder.append("]");
+ return builder.toString();
+ }
+
+ private Map<Candidate, TimeRange> getTimeRangeForChildren(TimeRange timeRange) {
+ Collections.sort(childCandidates, new Comparator<Candidate>() {
+ @Override
+ public int compare(Candidate o1, Candidate o2) {
+ return o1.getCost() < o2.getCost() ? -1 : o1.getCost() == o2.getCost() ? 0 : 1;
+ }
+ });
+
+ Map<Candidate, TimeRange> candidateTimeRangeMap = new HashMap<>();
+ // Sorted list based on the weights.
+ Set<TimeRange> ranges = new HashSet<>();
+
+ ranges.add(timeRange);
+ for (Candidate c : childCandidates) {
+ TimeRange.TimeRangeBuilder builder = getClonedBuiler(timeRange);
+ TimeRange tr = resolveTimeRange(c, ranges, builder);
+ if (tr != null) {
+ // If the time range is not null it means this child candidate is valid for this union candidate.
+ candidateTimeRangeMap.put(c, tr);
+ }
+ }
+ return candidateTimeRangeMap;
+ }
+
+ private TimeRange resolveTimeRange(Candidate c, Set<TimeRange> ranges, TimeRange.TimeRangeBuilder builder) {
+ Iterator<TimeRange> it = ranges.iterator();
+ Set<TimeRange> newTimeRanges = new HashSet<>();
+ TimeRange ret = null;
+ while (it.hasNext()) {
+ TimeRange range = it.next();
+ // Check for out of range
+ if (c.getStartTime().getTime() >= range.getToDate().getTime() || c.getEndTime().getTime() <= range.getFromDate()
+ .getTime()) {
+ continue;
+ }
+ // This means overlap.
+ if (c.getStartTime().getTime() <= range.getFromDate().getTime()) {
+ // Start time of the new time range will be range.getFromDate()
+ builder.fromDate(range.getFromDate());
+ if (c.getEndTime().getTime() <= range.getToDate().getTime()) {
+ // End time is in the middle of the range is equal to c.getEndTime().
+ builder.toDate(c.getEndTime());
+ } else {
+ // End time will be range.getToDate()
+ builder.toDate(range.getToDate());
+ }
+ } else {
+ builder.fromDate(c.getStartTime());
+ if (c.getEndTime().getTime() <= range.getToDate().getTime()) {
+ builder.toDate(c.getEndTime());
+ } else {
+ builder.toDate(range.getToDate());
+ }
+ }
+ // Remove the time range and add more time ranges.
+ it.remove();
+ ret = builder.build();
+ if (ret.getFromDate().getTime() == range.getFromDate().getTime()) {
+ if (ret.getToDate().getTime() < range.getToDate().getTime()) {
+ // The end time is the start time of the new range.
+ TimeRange.TimeRangeBuilder b1 = getClonedBuiler(ret);
+ b1.fromDate(ret.getFromDate());
+ b1.toDate(range.getToDate());
+ newTimeRanges.add(b1.build());
+ }
+ } else {
+ TimeRange.TimeRangeBuilder b1 = getClonedBuiler(ret);
+ b1.fromDate(range.getFromDate());
+ b1.toDate(ret.getFromDate());
+ newTimeRanges.add(b1.build());
+ if (ret.getToDate().getTime() < range.getToDate().getTime()) {
+ TimeRange.TimeRangeBuilder b2 = getClonedBuiler(ret);
+ b2.fromDate(ret.getToDate());
+ b2.toDate(range.getToDate());
+ newTimeRanges.add(b2.build());
+ }
+ }
+ break;
+ }
+ ranges.addAll(newTimeRanges);
+ return ret;
+ }
+
+ private TimeRange.TimeRangeBuilder getClonedBuiler(TimeRange timeRange) {
+ TimeRange.TimeRangeBuilder builder = new TimeRange.TimeRangeBuilder();
+ builder.astNode(timeRange.getAstNode());
+ builder.childIndex(timeRange.getChildIndex());
+ builder.parent(timeRange.getParent());
+ builder.partitionColumn(timeRange.getPartitionColumn());
+ return builder;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java
new file mode 100644
index 0000000..cae66d5
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.cube.parse;
+
+/**
+ * This is a helper that is used for creating QueryAst for UnionCandidate
+ */
+public class UnionQueryWriter {
+
+ private UnionCandidate candidate;
+
+ private SimpleHQLContext simpleHQLContext;
+
+ private QueryAST ast;
+
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java
index 3d5c5ac..ab7a0f9 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java
@@ -169,6 +169,7 @@ public class AutoJoinContext {
joinPathFromColumns.remove(dim);
}
+ //TODO union: use StaorgeCandidate
public String getFromString(String fromTable, CandidateFact fact, Set<Dimension> qdims,
Map<Dimension, CandidateDim> dimsToQuery, CubeQueryContext cubeql, QueryAST ast) throws LensException {
String fromString = fromTable;
@@ -347,6 +348,16 @@ public class AutoJoinContext {
return allPaths;
}
+ //TODO union: use Set<StorageCandidate>
+ /**
+ * Prunes the join chains defined in Cube whose starting column is not there in any of the candidate facts.
+ * Same is done in case of join paths defined in Dimensions.
+ *
+ * @param cube
+ * @param cfacts
+ * @param dimsToQuery
+ * @throws LensException
+ */
public void pruneAllPaths(CubeInterface cube, final Set<CandidateFact> cfacts,
final Map<Dimension, CandidateDim> dimsToQuery) throws LensException {
// Remove join paths which cannot be satisfied by the resolved candidate
@@ -355,6 +366,7 @@ public class AutoJoinContext {
// include columns from all picked facts
Set<String> factColumns = new HashSet<>();
for (CandidateFact cFact : cfacts) {
+ //Use StoargeCandidate.getColumns()
factColumns.addAll(cFact.getColumns());
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/driver/cube/RewriterPlan.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/driver/cube/RewriterPlan.java b/lens-cube/src/main/java/org/apache/lens/driver/cube/RewriterPlan.java
index fd6c30d..a5ae425 100644
--- a/lens-cube/src/main/java/org/apache/lens/driver/cube/RewriterPlan.java
+++ b/lens-cube/src/main/java/org/apache/lens/driver/cube/RewriterPlan.java
@@ -23,8 +23,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.lens.cube.metadata.FactPartition;
-import org.apache.lens.cube.parse.CandidateTable;
-import org.apache.lens.cube.parse.CubeQueryContext;
+import org.apache.lens.cube.parse.*;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.driver.DriverQueryPlan;
import org.apache.lens.server.api.error.LensException;
@@ -49,23 +48,24 @@ public final class RewriterPlan extends DriverQueryPlan {
for (CubeQueryContext ctx : cubeQueries) {
if (ctx.getPickedDimTables() != null && !ctx.getPickedDimTables().isEmpty()) {
- for (CandidateTable dim : ctx.getPickedDimTables()) {
- addTablesQueried(dim.getStorageTables());
+ for (CandidateDim dim : ctx.getPickedDimTables()) {
+ addTablesQueried(dim.getStorageName());
if (partitions.get(dim.getName()) == null || partitions.get(dim.getName()).isEmpty()) {
// puts storage table to latest part
- partitions.put(dim.getName(), dim.getPartsQueried());
+ partitions.put(dim.getName(), dim.getParticipatingPartitions());
}
}
}
- if (ctx.getPickedFacts() != null && !ctx.getPickedFacts().isEmpty()) {
- for (CandidateTable fact : ctx.getPickedFacts()) {
- addTablesQueried(fact.getStorageTables());
- Set<FactPartition> factParts = (Set<FactPartition>) partitions.get(fact.getName());
+ //TODO union: updated code to work on picked Candidate
+ if (ctx.getPickedCandidate() != null) {
+ for (StorageCandidate sc : CandidateUtil.getStorageCandidates(ctx.getPickedCandidate())) {
+ addTablesQueried(sc.getStorageName());
+ Set<FactPartition> factParts = (Set<FactPartition>) partitions.get(sc.getName());
if (factParts == null) {
factParts = new HashSet<FactPartition>();
- partitions.put(fact.getName(), factParts);
+ partitions.put(sc.getName(), factParts);
}
- factParts.addAll((Set<FactPartition>) fact.getPartsQueried());
+ factParts.addAll((Set<FactPartition>) sc.getParticipatingPartitions());
}
}
for (String table : getTablesQueried()) {
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java
index 41ea83d..90be92d 100644
--- a/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java
+++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java
@@ -537,8 +537,17 @@ public class CubeTestSetup {
"New measure", null, null, null, NOW, null, 100.0));
cubeMeasures.add(new ColumnMeasure(new FieldSchema("msr15", "int", "fifteenth measure"), "Measure15", null, "SUM",
"RS"));
+ String prefix = "union_join_ctx_";
+ cubeMeasures.add(new ColumnMeasure(new FieldSchema(prefix + "msr1", "int", prefix + "first measure")));
+ cubeMeasures.add(new ColumnMeasure(new FieldSchema(prefix + "msr2", "int", prefix + "second measure")));
+ cubeMeasures.add(new ColumnMeasure(new FieldSchema(prefix + "msr3", "int", prefix + "third measure")));
cubeDimensions = new HashSet<CubeDimAttribute>();
+
+ cubeDimensions.add(new BaseDimAttribute(new FieldSchema(prefix + "d_time", "timestamp", "d time")));
+ cubeDimensions.add(new BaseDimAttribute(new FieldSchema(prefix + "cityid", "timestamp", "the cityid ")));
+ cubeDimensions.add(new BaseDimAttribute(new FieldSchema(prefix + "zipcode", "timestamp", "the zipcode")));
+
cubeDimensions.add(new BaseDimAttribute(new FieldSchema("d_time", "timestamp", "d time")));
cubeDimensions.add(new BaseDimAttribute(new FieldSchema("processing_time", "timestamp", "processing time")));
List<CubeDimAttribute> locationHierarchy = new ArrayList<CubeDimAttribute>();
@@ -1268,6 +1277,113 @@ public class CubeTestSetup {
// create base cube facts
createBaseCubeFacts(client);
+ // create join and union ctx facts
+ createUnionAndJoinContextFacts(client);
+ }
+
+ private void createUnionAndJoinContextFacts(CubeMetastoreClient client) throws HiveException, LensException {
+ String prefix = "union_join_ctx_";
+ String derivedCubeName = prefix + "der1";
+ Map<String, Set<UpdatePeriod>> storageAggregatePeriods = new HashMap<String, Set<UpdatePeriod>>();
+ Set<UpdatePeriod> updates = new HashSet<UpdatePeriod>();
+ updates.add(DAILY);
+
+ ArrayList<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<String> timePartCols = new ArrayList<String>();
+ partCols.add(TestCubeMetastoreClient.getDatePartition());
+ timePartCols.add(TestCubeMetastoreClient.getDatePartitionKey());
+
+ StorageTableDesc s1 = new StorageTableDesc();
+ s1.setInputFormat(TextInputFormat.class.getCanonicalName());
+ s1.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName());
+ s1.setPartCols(partCols);
+ s1.setTimePartCols(timePartCols);
+
+ storageAggregatePeriods.put(c1, updates);
+
+ Map<String, StorageTableDesc> storageTables = new HashMap<String, StorageTableDesc>();
+ storageTables.put(c1, s1);
+
+ // create fact1 (all dim attributes only msr1)
+ String factName = prefix + "fact1";
+ List<FieldSchema> factColumns = new ArrayList<FieldSchema>();
+ factColumns.add(new ColumnMeasure(new FieldSchema(prefix + "msr1", "int", "first measure")).getColumn());
+ factColumns.add(new FieldSchema("d_time", "timestamp", "event time"));
+ factColumns.add(new FieldSchema(prefix + "zipcode", "int", "zip"));
+ factColumns.add(new FieldSchema(prefix + "cityid", "int", "city id"));
+ // add fact start and end time property
+ Map<String, String> properties = Maps.newHashMap(factValidityProperties);
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day - 90 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day - 30 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ // create fact2 with same schema, but it starts after fact1 ends
+ factName = prefix + "fact2";
+ properties.clear();
+ //factColumns.add(new ColumnMeasure(new FieldSchema(prefix + "msr2", "int", "second measure")).getColumn());
+ // add fact start and end time property
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day - 31 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day + 7 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ // create fact3 (all dim attributes only msr2)
+ factName = prefix + "fact3";
+ factColumns.clear();
+ factColumns.add(new ColumnMeasure(new FieldSchema(prefix + "msr2", "int", "second measure")).getColumn());
+ factColumns.add(new FieldSchema("d_time", "timestamp", "event time"));
+ factColumns.add(new FieldSchema(prefix + "zipcode", "int", "zip"));
+ factColumns.add(new FieldSchema(prefix + "cityid", "int", "city id"));
+ properties.clear();
+ // add fact start and end time property
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day - 90 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day + 7 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ // create fact4 will all all measures and entire timerange covered
+ factName = prefix + "fact4";
+ factColumns.add(new ColumnMeasure(new FieldSchema(prefix + "msr1", "int", "first measure")).getColumn());
+ properties.clear();
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day - 90 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day + 7 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ // create fact5 and fact6 with msr3 and covering timerange as set
+ factName = prefix + "fact5";
+ factColumns.clear();
+ factColumns.add(new FieldSchema("d_time", "timestamp", "event time"));
+ factColumns.add(new FieldSchema(prefix + "zipcode", "int", "zip"));
+ factColumns.add(new FieldSchema(prefix + "cityid", "int", "city id"));
+ factColumns.add(new ColumnMeasure(new FieldSchema(prefix + "msr3", "int", "third measure")).getColumn());
+ properties.clear();
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day - 90 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day -30 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ factName = prefix + "fact6";
+ properties.clear();
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_START_TIME, DateUtil.relativeToAbsolute("now.day -31 days"));
+ properties.put(MetastoreConstants.FACT_ABSOLUTE_END_TIME, DateUtil.relativeToAbsolute("now.day + 7 days"));
+ client.createCubeFactTable(BASE_CUBE_NAME, factName, factColumns, storageAggregatePeriods, 5L,
+ properties, storageTables);
+
+ // Create derived cube
+ Map<String, String> derivedProperties = new HashMap<>();
+ derivedProperties.put(MetastoreConstants.CUBE_ALL_FIELDS_QUERIABLE, "true");
+ Set<String> measures = new HashSet<>();
+ measures.add(prefix + "msr1");
+ measures.add(prefix + "msr2");
+ measures.add(prefix + "msr3");
+ Set<String> dimensions = new HashSet<>();
+ dimensions.add(prefix + "cityid");
+ dimensions.add(prefix + "zipcode");
+ dimensions.add("d_time");
+ client.createDerivedCube(BASE_CUBE_NAME, derivedCubeName, measures, dimensions, derivedProperties, 5L);
+
}
private void createBaseCubeFacts(CubeMetastoreClient client) throws HiveException, LensException {
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/test/java/org/apache/lens/cube/parse/TestUnionAndJoinCandidates.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/TestUnionAndJoinCandidates.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestUnionAndJoinCandidates.java
new file mode 100644
index 0000000..061224e
--- /dev/null
+++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestUnionAndJoinCandidates.java
@@ -0,0 +1,65 @@
+package org.apache.lens.cube.parse;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.lens.server.api.LensServerAPITestUtil;
+import org.apache.lens.server.api.error.LensException;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.apache.lens.cube.metadata.DateFactory.*;
+import static org.apache.lens.cube.parse.CubeQueryConfUtil.*;
+import static org.apache.lens.cube.parse.CubeTestSetup.*;
+
+public class TestUnionAndJoinCandidates extends TestQueryRewrite {
+
+ private Configuration testConf;
+
+ @BeforeTest
+ public void setupDriver() throws Exception {
+ testConf = LensServerAPITestUtil.getConfiguration(
+ DISABLE_AUTO_JOINS, false,
+ ENABLE_SELECT_TO_GROUPBY, true,
+ ENABLE_GROUP_BY_TO_SELECT, true,
+ DISABLE_AGGREGATE_RESOLVER, false,
+ ENABLE_STORAGES_UNION, true);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return new Configuration(testConf);
+ }
+
+ @Test
+ public void testRangeCoveringCandidates() throws ParseException, LensException {
+ try {
+ String prefix = "union_join_ctx_";
+ String cubeName = prefix + "der1";
+ Configuration conf = LensServerAPITestUtil.getConfigurationWithParams(getConf(),
+ //Supported storage
+ CubeQueryConfUtil.DRIVER_SUPPORTED_STORAGES, "C1",
+ // Storage tables
+ getValidStorageTablesKey(prefix + "fact1"), "C1_" + prefix + "fact1",
+ getValidStorageTablesKey(prefix + "fact2"), "C1_" + prefix + "fact2",
+ getValidStorageTablesKey(prefix + "fact3"), "C1_" + prefix + "fact3",
+ // Update periods
+ getValidUpdatePeriodsKey(prefix + "fact1", "C1"), "DAILY",
+ getValidUpdatePeriodsKey(prefix + "fact2", "C1"), "DAILY",
+ getValidUpdatePeriodsKey(prefix + "fact3", "C1"), "DAILY");
+
+ String colsSelected = prefix + "cityid , " + prefix + "zipcode , " + "sum(" + prefix + "msr1) , "
+ + "sum(" + prefix + "msr2), " + "sum(" + prefix + "msr3) ";
+
+ String whereCond = prefix + "zipcode = 'a' and " + prefix + "cityid = 'b' and " +
+ "(" + TWO_MONTHS_RANGE_UPTO_DAYS + ")";
+ String hqlQuery = rewrite("select " + colsSelected + " from " + cubeName + " where " + whereCond, conf);
+
+ System.out.println(hqlQuery);
+
+ } finally {
+ getStorageToUpdatePeriodMap().clear();
+ }
+ }
+
+}
[2/3] lens git commit: Initial chnages for Union
Posted by pu...@apache.org.
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
index 11eb8f7..64a9626 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java
@@ -98,89 +98,93 @@ class QueriedPhraseContext extends TracksQueriedColumns implements TrackQueriedC
return false;
}
- boolean isEvaluable(CubeQueryContext cubeQl, CandidateFact cfact) throws LensException {
+ /**
+ * TODO union: change CandidateFact to StorageCandidate. Let the callers typecast and send for now.
+ * @param cubeQl
+ * @param sc
+ * @return
+ * @throws LensException
+ */
+ public boolean isEvaluable(CubeQueryContext cubeQl, StorageCandidate sc) throws LensException {
// all measures of the queried phrase should be present
for (String msr : queriedMsrs) {
- if (!checkForColumnExistsAndValidForRange(cfact, msr, cubeQl)) {
+ if (!checkForColumnExistsAndValidForRange(sc, msr, cubeQl)) {
return false;
}
}
// all expression columns should be evaluable
for (String exprCol : queriedExprColumns) {
- if (!cubeQl.getExprCtx().isEvaluable(exprCol, cfact)) {
- log.info("expression {} is not evaluable in fact table:{}", expr, cfact);
+ if (!cubeQl.getExprCtx().isEvaluable(exprCol, sc)) {
+ log.info("expression {} is not evaluable in fact table:{}", expr, sc);
return false;
}
}
// all dim-attributes should be present.
for (String col : queriedDimAttrs) {
- if (!cfact.getColumns().contains(col.toLowerCase())) {
+ if (!sc.getColumns().contains(col.toLowerCase())) {
// check if it available as reference
- if (!cubeQl.getDeNormCtx().addRefUsage(cfact, col, cubeQl.getCube().getName())) {
- log.info("column {} is not available in fact table:{} ", col, cfact);
+ if (!cubeQl.getDeNormCtx().addRefUsage(sc, col, cubeQl.getCube().getName())) {
+ log.info("column {} is not available in fact table:{} ", col, sc);
return false;
}
- } else if (!isFactColumnValidForRange(cubeQl, cfact, col)) {
- log.info("column {} is not available in range queried in fact {}", col, cfact);
+ } else if (!isFactColumnValidForRange(cubeQl, sc, col)) {
+ log.info("column {} is not available in range queried in fact {}", col, sc);
return false;
}
}
return true;
}
- public static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
+ private static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
return (isColumnAvailableFrom(range.getFromDate(), startTime)
&& isColumnAvailableTill(range.getToDate(), endTime));
}
- public static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
+ private static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
return (startTime == null) || date.equals(startTime) || date.after(startTime);
}
- public static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
+ private static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
return (endTime == null) || date.equals(endTime) || date.before(endTime);
}
- public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, CandidateTable cfact, String col) {
+ public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, StorageCandidate sc, String col) {
for(TimeRange range : cubeql.getTimeRanges()) {
- if (!isColumnAvailableInRange(range, getFactColumnStartTime(cfact, col), getFactColumnEndTime(cfact, col))) {
+ if (!isColumnAvailableInRange(range, getFactColumnStartTime(sc, col), getFactColumnEndTime(sc, col))) {
return false;
}
}
return true;
}
- public static Date getFactColumnStartTime(CandidateTable table, String factCol) {
+ public static Date getFactColumnStartTime(StorageCandidate sc, String factCol) {
Date startTime = null;
- if (table instanceof CandidateFact) {
- for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
+ for (String key : sc.getTable().getProperties().keySet()) {
if (key.contains(MetastoreConstants.FACT_COL_START_TIME_PFX)) {
String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_START_TIME_PFX);
if (factCol.equals(propCol)) {
- startTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true);
+ startTime = sc.getTable().getDateFromProperty(key, false, true);
}
}
}
- }
return startTime;
}
- public static Date getFactColumnEndTime(CandidateTable table, String factCol) {
+ public static Date getFactColumnEndTime(StorageCandidate sc, String factCol) {
Date endTime = null;
- if (table instanceof CandidateFact) {
- for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
+ for (String key : sc.getTable().getProperties().keySet()) {
if (key.contains(MetastoreConstants.FACT_COL_END_TIME_PFX)) {
String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_END_TIME_PFX);
if (factCol.equals(propCol)) {
- endTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true);
+ endTime = sc.getTable().getDateFromProperty(key, false, true);
}
}
}
- }
- return endTime;
+ return endTime;
}
- static boolean checkForColumnExistsAndValidForRange(CandidateTable table, String column, CubeQueryContext cubeql) {
- return (table.getColumns().contains(column) && isFactColumnValidForRange(cubeql, table, column));
+ static boolean checkForColumnExistsAndValidForRange(StorageCandidate sc, String column, CubeQueryContext cubeql) {
+ return (sc.getColumns().contains(column) && isFactColumnValidForRange(cubeql, sc, column));
}
+
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
index 7298604..bdd6376 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java
@@ -83,4 +83,6 @@ public interface QueryAST {
ASTNode getOrderByAST();
void setOrderByAST(ASTNode node);
+
+ void setJoinAST(ASTNode node);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
new file mode 100644
index 0000000..22038f3
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.cube.parse;
+
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import static org.apache.lens.cube.parse.StorageUtil.*;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.metastore.DataCompletenessChecker;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import com.google.common.collect.Sets;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Represents a fact on a storage table and the dimensions it needs to be joined with to answer the query
+ */
+@Slf4j
+public class StorageCandidate implements Candidate, CandidateTable {
+
+ @Getter
+ private final CubeQueryContext cubeql;
+ private final TimeRangeWriter rangeWriter;
+ private final String processTimePartCol;
+ private final CubeMetastoreClient client;
+ private final String completenessPartCol;
+ private final float completenessThreshold;
+ @Getter
+ private final String name;
+ /**
+ * Valid udpate periods populated by Phase 1.
+ */
+ private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>();
+ private Configuration conf = null;
+ private Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
+ private SimpleDateFormat partWhereClauseFormat = null;
+ /**
+ * Participating fact, storage and dimensions for this StorageCandidate
+ */
+ @Getter
+ private CubeFactTable fact;
+ @Getter
+ private String storageName;
+ private Map<Dimension, CandidateDim> dimensions;
+ private Map<TimeRange, String> rangeToWhere = new LinkedHashMap<>();
+ @Getter
+ private CubeInterface cube;
+ /**
+ * Cached fact columns
+ */
+ private Collection<String> factColumns;
+ /**
+ * This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data.
+ * Value is a map of date string and %completeness.
+ */
+ @Getter
+ @Setter
+ private Map<String, Map<String, Float>> incompleteDataDetails;
+ /**
+ * Partition calculated by getPartition() method.
+ */
+ private Set<FactPartition> storagePartitions = new HashSet<>();
+ /**
+ * Non existing partitions
+ */
+ private Set<String> nonExistingPartitions = new HashSet<>();
+ @Getter
+ private String alias = null;
+
+ public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, String alias,
+ CubeQueryContext cubeql) {
+ if ((cube == null) || (fact == null) || (storageName == null) || (alias == null)) {
+ throw new IllegalArgumentException("Cube,fact and storageName should be non null");
+ }
+ this.cube = cube;
+ this.fact = fact;
+ this.cubeql = cubeql;
+ this.storageName = storageName;
+ this.conf = cubeql.getConf();
+ this.alias = alias;
+ this.name = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName);
+ rangeWriter = ReflectionUtils.newInstance(conf
+ .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER,
+ TimeRangeWriter.class), conf);
+ this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL);
+ String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
+ if (formatStr != null) {
+ this.partWhereClauseFormat = new SimpleDateFormat(formatStr);
+ }
+ completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
+ client = cubeql.getMetastoreClient();
+ completenessThreshold = conf
+ .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
+ }
+
+ @Override
+ public String toHQL() {
+ return null;
+ }
+
+ @Override
+ public QueryAST getQueryAst() {
+ return null;
+ }
+
+ @Override
+ public String getStorageString(String alias) {
+ return null;
+ }
+
+ @Override
+ public AbstractCubeTable getTable() {
+ return fact;
+ }
+
+ @Override
+ public AbstractCubeTable getBaseTable() {
+ return (AbstractCubeTable) cube;
+ }
+
+ @Override
+ public Collection<String> getColumns() {
+ if (factColumns == null) {
+ factColumns = fact.getValidColumns();
+ if (factColumns == null) {
+ factColumns = fact.getAllFieldNames();
+ }
+ }
+ return factColumns;
+ }
+
+ @Override
+ public Date getStartTime() {
+ return fact.getStartTime();
+ }
+
+ @Override
+ public Date getEndTime() {
+ return fact.getEndTime();
+ }
+
+ @Override
+ public double getCost() {
+ return fact.weight();
+ }
+
+ @Override
+ public boolean contains(Candidate candidate) {
+ return this.equals(candidate);
+ }
+
+ @Override
+ public Collection<Candidate> getChildren() {
+ return null;
+ }
+
+ private void updatePartitionStorage(FactPartition part) throws LensException {
+ try {
+ if (client.isStorageTablePartitionACandidate(name, part.getPartSpec()) && (client
+ .factPartitionExists(fact, part, name))) {
+ part.getStorageTables().add(name);
+ part.setFound(true);
+ }
+ } catch (HiveException e) {
+ log.warn("Hive exception while getting storage table partition", e);
+ }
+ }
+
+ /**
+ * Gets FactPartitions for the given fact using the following logic
+ *
+ * 1. Find the max update interval that will be used for the query. Lets assume time range is 15 Sep to 15 Dec and the
+ * fact has two storage with update periods as MONTHLY,DAILY,HOURLY. In this case the data for
+ * [15 sep - 1 oct)U[1 Dec - 15 Dec) will be answered by DAILY partitions and [1 oct - 1Dec) will be answered by
+ * MONTHLY partitions. The max interavl for this query will be MONTHLY.
+ *
+ * 2.Prune Storgaes that do not fall in the queries time range.
+ * {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)}
+ *
+ * 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for these two months.
+ * Check validity of FactPartitions for Oct and Nov via {@link #updatePartitionStorage(FactPartition)}.
+ * If the partition is missing, try getting partitions for the time range form other update periods (DAILY,HOURLY).This
+ * is achieved by calling getPartitions() recursively but passing only 2 update periods (DAILY,HOURLY)
+ *
+ * 4.If the monthly partitions are found, check for lookahead partitions and call getPartitions recursively for the
+ * remaining time intervals i.e, [15 sep - 1 oct) and [1 Dec - 15 Dec)
+ */
+ private boolean getPartitions(Date fromDate, Date toDate, String partCol, Set<FactPartition> partitions,
+ TreeSet<UpdatePeriod> updatePeriods, boolean addNonExistingParts, boolean failOnPartialData,
+ PartitionRangesForPartitionColumns missingPartitions) throws LensException {
+ if (fromDate.equals(toDate) || fromDate.after(toDate)) {
+ return true;
+ }
+ UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
+ if (interval == null) {
+ log.info("No max interval for range: {} to {}", fromDate, toDate);
+ return false;
+ }
+
+ if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
+ FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ log.info("Added continuous fact partition for storage table {}", name);
+ return true;
+ }
+
+ if (!client.isStorageTableCandidateForRange(name, fromDate, toDate)) {
+ cubeql.addStoragePruningMsg(this,
+ new CandidateTablePruneCause(CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+ // skipStorageCauses.put(name, new CandidateTablePruneCause.SkipStorageCause(RANGE_NOT_ANSWERABLE));
+ return false;
+ } else if (!client.partColExists(name, partCol)) {
+ log.info("{} does not exist in {}", partCol, name);
+ // skipStorageCauses.put(name, CandidateTablePruneCause.SkipStorageCause.partColDoesNotExist(partCol));
+ List<String> missingCols = new ArrayList<>();
+ missingCols.add(partCol);
+ cubeql.addStoragePruningMsg(this, partitionColumnsMissing(missingCols));
+ return false;
+ }
+
+ Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
+ Date floorToDate = DateUtil.getFloorDate(toDate, interval);
+
+ int lookAheadNumParts = conf
+ .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
+
+ TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
+ // add partitions from ceilFrom to floorTo
+ while (iter.hasNext()) {
+ Date dt = iter.next();
+ Date nextDt = iter.peekNext();
+ FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
+ updatePartitionStorage(part);
+ log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
+ if (part.isFound()) {
+ log.debug("Adding existing partition {}", part);
+ partitions.add(part);
+ log.debug("Looking for look ahead process time partitions for {}", part);
+ if (processTimePartCol == null) {
+ log.debug("processTimePartCol is null");
+ } else if (partCol.equals(processTimePartCol)) {
+ log.debug("part column is process time col");
+ } else if (updatePeriods.first().equals(interval)) {
+ log.debug("Update period is the least update period");
+ } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
+ // see if this is the part of the last-n look ahead partitions
+ log.debug("Not a look ahead partition");
+ } else {
+ log.debug("Looking for look ahead process time partitions for {}", part);
+ // check if finer partitions are required
+ // final partitions are required if no partitions from
+ // look-ahead
+ // process time are present
+ TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, interval, 1)
+ .iterator();
+ while (processTimeIter.hasNext()) {
+ Date pdt = processTimeIter.next();
+ Date nextPdt = processTimeIter.peekNext();
+ FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
+ partWhereClauseFormat);
+ updatePartitionStorage(processTimePartition);
+ if (processTimePartition.isFound()) {
+ log.debug("Finer parts not required for look-ahead partition :{}", part);
+ } else {
+ log.debug("Looked ahead process time partition {} is not found", processTimePartition);
+ TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
+ newset.addAll(updatePeriods);
+ newset.remove(interval);
+ log.debug("newset of update periods:{}", newset);
+ if (!newset.isEmpty()) {
+ // Get partitions for look ahead process time
+ log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
+ Set<FactPartition> processTimeParts = getPartitions(
+ TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(),
+ newset, true, false, missingPartitions);
+ log.debug("Look ahead partitions: {}", processTimeParts);
+ TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
+ for (FactPartition pPart : processTimeParts) {
+ log.debug("Looking for finer partitions in pPart: {}", pPart);
+ for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
+ FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
+ partWhereClauseFormat);
+ updatePartitionStorage(innerPart);
+ innerPart.setFound(pPart.isFound());
+ if (innerPart.isFound()) {
+ partitions.add(innerPart);
+ }
+ }
+ log.debug("added all sub partitions blindly in pPart: {}", pPart);
+ }
+ }
+ }
+ }
+ }
+ } else {
+ log.info("Partition:{} does not exist in any storage table", part);
+ TreeSet<UpdatePeriod> newset = new TreeSet<>();
+ newset.addAll(updatePeriods);
+ newset.remove(interval);
+ if (!getPartitions(dt, nextDt, partCol, partitions, newset, false, failOnPartialData, missingPartitions)) {
+ log.debug("Adding non existing partition {}", part);
+ if (addNonExistingParts) {
+ // Add non existing partitions for all cases of whether we populate all non existing or not.
+ missingPartitions.add(part);
+ if (!failOnPartialData) {
+ if (client.isStorageTablePartitionACandidate(name, part.getPartSpec())) {
+ log.info("Storage tables not eligible");
+ return false;
+ }
+ partitions.add(part);
+ part.getStorageTables().add(name);
+ }
+ } else {
+ log.info("No finer granual partitions exist for {}", part);
+ return false;
+ }
+ } else {
+ log.debug("Finer granual partitions added for {}", part);
+ }
+ }
+ }
+ return
+ getPartitions(fromDate, ceilFromDate, partCol, partitions, updatePeriods, addNonExistingParts, failOnPartialData,
+ missingPartitions) && getPartitions(floorToDate, toDate, partCol, partitions, updatePeriods,
+ addNonExistingParts, failOnPartialData, missingPartitions);
+ }
+
+ /**
+ * Finds all the partitions for a storage table with a particular time range.
+ *
+ * @param timeRange : TimeRange to check completeness for. TimeRange consists of start time, end time and the
+ * partition column
+ * @param failOnPartialData : fail fast if the candidate can answer the query only partially
+ * @return Steps:
+ * 1. Get skip storage causes
+ * 2. getPartitions for timeRange and validUpdatePeriods
+ */
+ @Override
+ public boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData) throws LensException {
+ // Check the measure tags.
+ if (!evaluateMeasuresCompleteness(timeRange)) {
+ log
+ .info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", fact, incompleteMeasureData,
+ cubeql.getTimeRanges());
+ cubeql.addStoragePruningMsg(this, incompletePartitions(incompleteMeasureData));
+ if (failOnPartialData) {
+ return false;
+ }
+ }
+ PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
+ PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs();
+ Set<String> unsupportedTimeDims = Sets.newHashSet();
+ Set<String> partColsQueried = Sets.newHashSet();
+ partColsQueried.add(timeRange.getPartitionColumn());
+ StringBuilder extraWhereClauseFallback = new StringBuilder();
+ Set<FactPartition> rangeParts = getPartitions(timeRange, validUpdatePeriods, true, failOnPartialData, missingParts);
+ String partCol = timeRange.getPartitionColumn();
+ boolean partColNotSupported = rangeParts.isEmpty();
+ String storageTableName = getStorageName();
+ if (storagePruningMsgs.containsKey(storageTableName)) {
+ List<CandidateTablePruneCause> causes = storagePruningMsgs.get(storageTableName);
+ // Find the PART_COL_DOES_NOT_EXISTS
+ for (CandidateTablePruneCause cause : causes) {
+ if (cause.getCause().equals(CandidateTablePruneCode.PART_COL_DOES_NOT_EXIST)) {
+ partColNotSupported = cause.getNonExistantPartCols().contains(partCol);
+ }
+ }
+ }
+ TimeRange prevRange = timeRange;
+ String sep = "";
+ while (rangeParts.isEmpty()) {
+ String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
+ if (partColNotSupported && !getFact().getColumns().contains(timeDim)) {
+ unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn()));
+ break;
+ }
+ TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql);
+ log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange);
+ if (fallBackRange == null) {
+ break;
+ }
+ partColsQueried.add(fallBackRange.getPartitionColumn());
+ rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts);
+ extraWhereClauseFallback.append(sep)
+ .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
+ sep = " AND ";
+ prevRange = fallBackRange;
+ partCol = prevRange.getPartitionColumn();
+ if (!rangeParts.isEmpty()) {
+ break;
+ }
+ }
+ if (!unsupportedTimeDims.isEmpty()) {
+ log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", this.getFact(),
+ unsupportedTimeDims);
+ cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims));
+ return false;
+ }
+ Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
+ // TODO union : Relook at this.
+ nonExistingPartitions.addAll(nonExistingParts);
+ if (rangeParts.size() == 0 || (failOnPartialData && !nonExistingParts.isEmpty())) {
+ log.info("No partitions for fallback range:{}", timeRange);
+ return false;
+ }
+ String extraWhere = extraWhereClauseFallback.toString();
+ if (!StringUtils.isEmpty(extraWhere)) {
+ rangeToWhere.put(timeRange, "((" + rangeWriter
+ .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts)
+ + ") and (" + extraWhere + "))");
+ } else {
+ rangeToWhere.put(timeRange, rangeWriter
+ .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts));
+ }
+ // Add all the partitions. storagePartitions contains all the partitions for previous time ranges also.
+ this.storagePartitions.addAll(rangeParts);
+ return true;
+ }
+
+ private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException {
+ String factDataCompletenessTag = fact.getDataCompletenessTag();
+ if (factDataCompletenessTag == null) {
+ log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", fact);
+ return true;
+ }
+ Set<String> measureTag = new HashSet<>();
+ Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
+
+ processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
+
+ Set<String> measures = cubeql.getQueriedMsrs();
+ if (measures == null) {
+ measures = new HashSet<>();
+ }
+ for (String measure : measures) {
+ processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
+ }
+ //Checking if dataCompletenessTag is set for the fact
+ if (measureTag.isEmpty()) {
+ log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
+ return true;
+ }
+ boolean isDataComplete = false;
+ DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
+ DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ if (!timeRange.getPartitionColumn().equals(completenessPartCol)) {
+ log.info("Completeness check not available for partCol:{}", timeRange.getPartitionColumn());
+ return true;
+ }
+ Date from = timeRange.getFromDate();
+ Date to = timeRange.getToDate();
+ Map<String, Map<Date, Float>> completenessMap = completenessChecker
+ .getCompleteness(factDataCompletenessTag, from, to, measureTag);
+ if (completenessMap != null && !completenessMap.isEmpty()) {
+ for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
+ String tag = measureCompleteness.getKey();
+ for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
+ if (completenessResult.getValue() < completenessThreshold) {
+ log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
+ completenessResult.getValue(), completenessThreshold, formatter.format(completenessResult.getKey()));
+ String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
+ Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
+ if (incompletePartition == null) {
+ incompletePartition = new HashMap<>();
+ incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
+ }
+ incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
+ isDataComplete = true;
+ }
+ }
+ }
+ }
+ return isDataComplete;
+ }
+
+ private Set<FactPartition> getPartitions(TimeRange timeRange, TreeSet<UpdatePeriod> updatePeriods,
+ boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts)
+ throws LensException {
+ Set<FactPartition> partitions = new TreeSet<>();
+ if (timeRange != null && timeRange.isCoverableBy(updatePeriods) && getPartitions(timeRange.getFromDate(),
+ timeRange.getToDate(), timeRange.getPartitionColumn(), partitions, updatePeriods, addNonExistingParts,
+ failOnPartialData, missingParts)) {
+ return partitions;
+ }
+ return new TreeSet<>();
+ }
+
+ @Override
+ public Set<FactPartition> getParticipatingPartitions() {
+ return null;
+ }
+
+ @Override
+ public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+ return expr.isEvaluable(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (super.equals(obj)) {
+ return true;
+ }
+
+ if (obj == null || !(obj instanceof StorageCandidate)) {
+ return false;
+ }
+
+ StorageCandidate storageCandidateObj = (StorageCandidate) obj;
+ //Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly
+ //on == check for these.
+ return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageName
+ .equals(storageCandidateObj.storageName));
+ }
+
+ @Override
+ public int hashCode() {
+ return this.name.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ public void addValidUpdatePeriod(UpdatePeriod updatePeriod) {
+ this.validUpdatePeriods.add(updatePeriod);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
index cdf6812..daab851 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,33 +18,28 @@
*/
package org.apache.lens.cube.parse;
+import static org.apache.lens.cube.metadata.MetastoreUtil.getFactOrDimtableStorageTableName;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIMEDIM_NOT_SUPPORTED;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.noCandidateStorages;
+import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange;
+
import java.text.DateFormat;
-import java.text.ParseException;
import java.text.SimpleDateFormat;
-
import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
-import static org.apache.lens.cube.metadata.MetastoreUtil.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*;
import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCause;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode;
import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.api.metastore.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.util.ReflectionUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -57,36 +52,23 @@ class StorageTableResolver implements ContextRewriter {
private final Configuration conf;
private final List<String> supportedStorages;
private final boolean allStoragesSupported;
- CubeMetastoreClient client;
private final boolean failOnPartialData;
private final List<String> validDimTables;
private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>();
- private String processTimePartCol = null;
private final UpdatePeriod maxInterval;
+ // TODO union : Remove this. All partitions are stored in the StorageCandidate.
private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>();
+ CubeMetastoreClient client;
+ Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
+ private String processTimePartCol = null;
private TimeRangeWriter rangeWriter;
private DateFormat partWhereClauseFormat = null;
private PHASE phase;
+ // TODO union : we do not need this. Remove the storage candidate
private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact;
private float completenessThreshold;
private String completenessPartCol;
- enum PHASE {
- FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
-
- static PHASE first() {
- return values()[0];
- }
-
- static PHASE last() {
- return values()[values().length - 1];
- }
-
- PHASE next() {
- return values()[(this.ordinal() + 1) % values().length];
- }
- }
-
public StorageTableResolver(Configuration conf) {
this.conf = conf;
this.supportedStorages = getSupportedStorages(conf);
@@ -101,16 +83,16 @@ class StorageTableResolver implements ContextRewriter {
} else {
this.maxInterval = null;
}
- rangeWriter =
- ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS,
- CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf);
+ rangeWriter = ReflectionUtils.newInstance(conf
+ .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER,
+ TimeRangeWriter.class), this.conf);
String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
if (formatStr != null) {
partWhereClauseFormat = new SimpleDateFormat(formatStr);
}
this.phase = PHASE.first();
- completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD,
- CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
+ completenessThreshold = conf
+ .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
}
@@ -122,36 +104,23 @@ class StorageTableResolver implements ContextRewriter {
return null;
}
- public boolean isStorageSupported(String storage) {
+ public boolean isStorageSupportedOnDriver(String storage) {
return allStoragesSupported || supportedStorages.contains(storage);
}
- Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
-
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
client = cubeql.getMetastoreClient();
switch (phase) {
- case FACT_TABLES:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage table names
- resolveFactStorageTableNames(cubeql);
+ case STORAGE_TABLES:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStorageTable(cubeql);
}
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
break;
- case FACT_PARTITIONS:
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve storage partitions
- resolveFactStoragePartitions(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
- if (client != null && client.isDataCompletenessCheckEnabled()) {
- if (!cubeql.getCandidateFacts().isEmpty()) {
- // resolve incomplete fact partition
- resolveFactCompleteness(cubeql);
- }
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION);
+ case STORAGE_PARTITIONS:
+ if (!cubeql.getCandidates().isEmpty()) {
+ resolveStoragePartitions(cubeql);
}
break;
case DIM_TABLE_AND_PARTITIONS:
@@ -162,13 +131,32 @@ class StorageTableResolver implements ContextRewriter {
cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables());
cubeql.getAutoJoinCtx().refreshJoinPathColumns();
}
+ // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate
+ cubeql.setNonexistingParts(nonExistingPartitions);
break;
}
- //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions.
- cubeql.setNonexistingParts(nonExistingPartitions);
phase = phase.next();
}
+ /**
+ * Each candidate in the set is a complex candidate. We will evaluate each one to get
+ * all the partitions needed to answer the query.
+ *
+ * @param cubeql
+ */
+ private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException {
+ Set<Candidate> candidateList = cubeql.getCandidates();
+ for (Candidate candidate : candidateList) {
+ boolean isComplete = true;
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ isComplete &= candidate.evaluateCompleteness(range, failOnPartialData);
+ }
+ if (!isComplete) {
+ // TODO union : Prune this candidate?
+ }
+ }
+ }
+
private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException {
Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions());
for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) {
@@ -184,8 +172,8 @@ class StorageTableResolver implements ContextRewriter {
CandidateDim candidate = i.next();
CubeDimensionTable dimtable = candidate.dimtable;
if (dimtable.getStorages().isEmpty()) {
- cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(
- CandidateTablePruneCode.MISSING_STORAGES));
+ cubeql
+ .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
i.remove();
continue;
}
@@ -194,7 +182,7 @@ class StorageTableResolver implements ContextRewriter {
boolean foundPart = false;
Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
for (String storage : dimtable.getStorages()) {
- if (isStorageSupported(storage)) {
+ if (isStorageSupportedOnDriver(storage)) {
String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase();
if (validDimTables != null && !validDimTables.contains(tableName)) {
log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName);
@@ -212,9 +200,8 @@ class StorageTableResolver implements ContextRewriter {
}
if (!failOnPartialData || foundPart) {
storageTables.add(tableName);
- String whereClause =
- StorageUtil.getWherePartClause(dim.getTimedDimension(), null,
- StorageConstants.getPartitionsForLatest());
+ String whereClause = StorageUtil
+ .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest());
whereClauses.put(tableName, whereClause);
} else {
log.info("Not considering dim storage table:{} as no dim partitions exist", tableName);
@@ -239,78 +226,115 @@ class StorageTableResolver implements ContextRewriter {
continue;
}
// pick the first storage table
- candidate.setStorageTable(storageTables.iterator().next());
- candidate.setWhereClause(whereClauses.get(candidate.getStorageTable()));
+ candidate.setStorageName(storageTables.iterator().next());
+ candidate.setWhereClause(whereClauses.get(candidate.getStorageName()));
}
}
}
- // Resolves all the storage table names, which are valid for each updatePeriod
- private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException {
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- skipStorageCausesPerFact = new HashMap<>();
- while (i.hasNext()) {
- CubeFactTable fact = i.next().fact;
- if (fact.getUpdatePeriods().isEmpty()) {
- cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
- i.remove();
+ /**
+ * Following storages are removed:
+ * 1. The storage is not supported by driver.
+ * 2. The storage is not in the valid storage list.
+ * 3. The storage is not in any time range in the query.
+ * 4. The storage having no valid update period.
+ *
+ * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}.
+ *
+ * TODO union : Do fourth point before 3.
+ */
+ private void resolveStorageTable(CubeQueryContext cubeql) throws LensException {
+ Iterator<Candidate> it = cubeql.getCandidates().iterator();
+ while (it.hasNext()) {
+ Candidate c = it.next();
+ assert (c instanceof StorageCandidate);
+ StorageCandidate sc = (StorageCandidate) c;
+ String storageTable = sc.getStorageName();
+ if (!isStorageSupportedOnDriver(storageTable)) {
+ log.info("Skipping storage: {} as it is not supported", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE));
+ it.remove();
+ continue;
+ }
+ String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName()));
+ List<String> validFactStorageTables = StringUtils.isBlank(str)
+ ? null
+ : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
+ // Check if storagetable is in the list of valid storages.
+ if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) {
+ log.info("Skipping storage table {} as it is not valid", storageTable);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE));
+ it.remove();
continue;
}
- Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>();
- validStorageMap.put(fact, storageTableMap);
- String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName()));
- List<String> validFactStorageTables =
- StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
- Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
- for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) {
- String storage = entry.getKey();
- // skip storages that are not supported
- if (!isStorageSupported(storage)) {
- log.info("Skipping storage: {} as it is not supported", storage);
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
- continue;
+ boolean valid = false;
+ Set<CandidateTablePruneCause.CandidateTablePruneCode> codes = new HashSet<>();
+ for (TimeRange range : cubeql.getTimeRanges()) {
+ boolean columnInRange = client
+ .isStorageTableCandidateForRange(storageTable, range.getFromDate(), range.getToDate());
+ boolean partitionColumnExists = client.partColExists(storageTable, range.getPartitionColumn());
+ valid = columnInRange && partitionColumnExists;
+ if (valid) {
+ break;
}
- String table = getStorageTableName(fact, storage, validFactStorageTables);
- // skip the update period if the storage is not valid
- if (table == null) {
- skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID));
+ if (!columnInRange) {
+ codes.add(TIME_RANGE_NOT_ANSWERABLE);
continue;
}
- List<String> validUpdatePeriods =
- CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage));
-
- boolean isStorageAdded = false;
- Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>();
- for (UpdatePeriod updatePeriod : entry.getValue()) {
- if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
- log.info("Skipping update period {} for fact {}", updatePeriod, fact);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
+ // This means fallback is required.
+ if (!partitionColumnExists) {
+ String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn());
+ if (!sc.getFact().getColumns().contains(timeDim)) {
+ // Not a time dimension so no fallback required.
+ codes.add(TIMEDIM_NOT_SUPPORTED);
continue;
}
- if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
- log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage);
- skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+ TimeRange fallBackRange = getFallbackRange(range, sc.getFact().getCubeName(), cubeql);
+ if (fallBackRange == null) {
+ log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
continue;
}
- Set<String> storageTables = storageTableMap.get(updatePeriod);
- if (storageTables == null) {
- storageTables = new LinkedHashSet<>();
- storageTableMap.put(updatePeriod, storageTables);
+ valid = client
+ .isStorageTableCandidateForRange(storageTable, fallBackRange.getFromDate(), fallBackRange.getToDate());
+ if (valid) {
+ break;
+ } else {
+ codes.add(TIME_RANGE_NOT_ANSWERABLE);
}
- isStorageAdded = true;
- log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod);
- storageTables.add(table);
}
- if (!isStorageAdded) {
- skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses));
+ }
+ if (!valid) {
+ it.remove();
+ for (CandidateTablePruneCode code : codes) {
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(code));
+ }
+ continue;
+ }
+
+ List<String> validUpdatePeriods = CubeQueryConfUtil
+ .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), storageTable));
+ boolean isStorageAdded = false;
+ Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>();
+
+ // Check for update period.
+ for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(storageTable)) {
+ if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
+ log.info("Skipping update period {} for fact {}", updatePeriod, sc.getFact());
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
+ continue;
+ }
+ if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
+ log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, sc.getFact(), storageTable);
+ skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+ continue;
}
+ isStorageAdded = true;
+ sc.addValidUpdatePeriod(updatePeriod);
}
- skipStorageCausesPerFact.put(fact, skipStorageCauses);
- if (storageTableMap.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", fact);
- cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses));
- i.remove();
+ if (!isStorageAdded) {
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses));
+ it.remove();
}
}
}
@@ -321,7 +345,7 @@ class StorageTableResolver implements ContextRewriter {
return set;
}
- String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
+ private String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase();
if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) {
log.info("Skipping storage table {} as it is not valid", tableName);
@@ -330,507 +354,12 @@ class StorageTableResolver implements ContextRewriter {
return tableName;
}
- private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql)
- throws LensException {
- Cube baseCube = cubeql.getBaseCube();
- ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName());
- if (!cubeql.getCube().getName().equals(baseCube.getName())) {
- tableNames.add(baseCube.getName());
- }
- String fallBackString = null;
- String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
- for (String tableName : tableNames) {
- fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
- .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
- if (StringUtils.isNotBlank(fallBackString)) {
- break;
- }
- }
- if (StringUtils.isBlank(fallBackString)) {
- return null;
- }
- Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
- if (!matcher.matches()) {
- return null;
- }
- DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
- DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
- String relatedTimeDim = matcher.group(1).trim();
- String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
- return TimeRange.getBuilder()
- .fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
- .toDate(diff1.negativeOffsetFrom(range.getToDate()))
- .partitionColumn(fallbackPartCol).build();
- }
-
- private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException {
- // Find candidate tables wrt supported storages
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- while (i.hasNext()) {
- CandidateFact cfact = i.next();
- Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>();
- List<FactPartition> answeringParts = new ArrayList<>();
- Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact);
- if (skipStorageCauses == null) {
- skipStorageCauses = new HashMap<>();
- }
- PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
- boolean noPartsForRange = false;
- Set<String> unsupportedTimeDims = Sets.newHashSet();
- Set<String> partColsQueried = Sets.newHashSet();
- for (TimeRange range : cubeql.getTimeRanges()) {
- partColsQueried.add(range.getPartitionColumn());
- StringBuilder extraWhereClause = new StringBuilder();
- Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts);
- // If no partitions were found, then we'll fallback.
- String partCol = range.getPartitionColumn();
- boolean partColNotSupported = rangeParts.isEmpty();
- for (String storage : cfact.fact.getStorages()) {
- String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase();
- partColNotSupported &= skipStorageCauses.containsKey(storageTableName)
- && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST)
- && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol);
- }
- TimeRange prevRange = range;
- String sep = "";
- while (rangeParts.isEmpty()) {
- // TODO: should we add a condition whether on range's partcol any missing partitions are not there
- String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
- if (partColNotSupported && !cfact.getColumns().contains(timeDim)) {
- unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn()));
- break;
- }
- TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql);
- log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
- if (fallBackRange == null) {
- break;
- }
- partColsQueried.add(fallBackRange.getPartitionColumn());
- rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts);
- extraWhereClause.append(sep)
- .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
- sep = " AND ";
- prevRange = fallBackRange;
- partCol = prevRange.getPartitionColumn();
- if (!rangeParts.isEmpty()) {
- break;
- }
- }
- whereClauseForFallback.put(range, extraWhereClause.toString());
- if (rangeParts.isEmpty()) {
- log.info("No partitions for fallback range:{}", range);
- noPartsForRange = true;
- continue;
- }
- // If multiple storage tables are part of the same fact,
- // capture range->storage->partitions
- Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>();
- for (FactPartition factPart : rangeParts) {
- for (String table : factPart.getStorageTables()) {
- if (!tablePartMap.containsKey(table)) {
- tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart)));
- } else {
- LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table);
- storagePart.add(factPart);
- }
- }
- }
- cfact.getRangeToStoragePartMap().put(range, tablePartMap);
- cfact.incrementPartsQueried(rangeParts.size());
- answeringParts.addAll(rangeParts);
- cfact.getPartsQueried().addAll(rangeParts);
- }
- if (!unsupportedTimeDims.isEmpty()) {
- log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact,
- unsupportedTimeDims);
- cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims));
- i.remove();
- continue;
- }
- Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
- if (!nonExistingParts.isEmpty()) {
- addNonExistingParts(cfact.fact.getName(), nonExistingParts);
- }
- if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) {
- log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact,
- cubeql.getTimeRanges());
- /*
- * This fact is getting discarded because of any of following reasons:
- * 1. Has missing partitions
- * 2. All Storage tables were skipped for some reasons.
- * 3. Storage tables do not have the update period for the timerange queried.
- */
- if (failOnPartialData && !nonExistingParts.isEmpty()) {
- cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts));
- } else if (!skipStorageCauses.isEmpty()) {
- CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- } else {
- CandidateTablePruneCause cause =
- new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE);
- cubeql.addFactPruningMsgs(cfact.fact, cause);
- }
- i.remove();
- continue;
- }
- // Map from storage to covering parts
- Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>();
- StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables);
- if (minimalStorageTables.isEmpty()) {
- log.info("Not considering fact table:{} as it does not have any storage tables", cfact);
- cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses));
- i.remove();
- continue;
- }
- Set<String> storageTables = new LinkedHashSet<>();
- storageTables.addAll(minimalStorageTables.keySet());
- cfact.setStorageTables(storageTables);
- // Update range->storage->partitions with time range where clause
- for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) {
- Map<String, String> rangeToWhere = new HashMap<>();
- for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) {
- String table = entry.getKey();
- Set<FactPartition> minimalParts = entry.getValue();
-
- LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table);
- LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet();
-
- if (rangeParts != null) {
- minimalPartsCopy.addAll(minimalParts);
- minimalPartsCopy.retainAll(rangeParts);
- }
- if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) {
- rangeToWhere.put(table, "(("
- + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()),
- minimalPartsCopy) + ") and (" + whereClauseForFallback.get(trange) + "))");
- } else {
- rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql,
- cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy));
- }
- }
- cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere);
- }
- log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables);
- }
- }
-
- private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
- Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
- if (column != null && column.getTags() != null) {
- String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
- //Checking if dataCompletenessTag is set for queried measure
- if (dataCompletenessTag != null) {
- measureTag.add(dataCompletenessTag);
- String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
- if (value == null) {
- tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
- } else {
- value = value.concat(",").concat(alias);
- tagToMeasureOrExprMap.put(dataCompletenessTag, value);
- }
- return true;
- }
- }
- return false;
- }
-
- private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
- Map<String, String> tagToMeasureOrExprMap) {
- boolean isExprProcessed;
- String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
- for (String expr : cubeql.getQueriedExprsWithMeasures()) {
- isExprProcessed = false;
- for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
- .getAllExprs()) {
- if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
- for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
- if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
- /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
- So, even if the expression is composed of measures with different dataCompletenessTags, we will be
- determining the dataCompleteness from one of the measure and this expression is grouped with the
- other queried measures that have the same dataCompletenessTag. */
- isExprProcessed = true;
- break;
- }
- }
- }
- if (isExprProcessed) {
- break;
- }
- }
- }
- }
-
- private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException {
- if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) {
- return;
- }
- DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
- Set<String> measureTag = new HashSet<>();
- Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
-
- processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
-
- Set<String> measures = cubeql.getQueriedMsrs();
- if (measures == null) {
- measures = new HashSet<>();
- }
- for (String measure : measures) {
- processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
- }
- //Checking if dataCompletenessTag is set for the fact
- if (measureTag.isEmpty()) {
- log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
- return;
- }
- Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
- DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
- while (i.hasNext()) {
- CandidateFact cFact = i.next();
- // Map from measure to the map from partition to %completeness
- Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
-
- String factDataCompletenessTag = cFact.fact.getDataCompletenessTag();
- if (factDataCompletenessTag == null) {
- log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact);
- continue;
- }
- boolean isFactDataIncomplete = false;
- for (TimeRange range : cubeql.getTimeRanges()) {
- if (!range.getPartitionColumn().equals(completenessPartCol)) {
- log.info("Completeness check not available for partCol:{}", range.getPartitionColumn());
- continue;
- }
- Date from = range.getFromDate();
- Date to = range.getToDate();
- Map<String, Map<Date, Float>> completenessMap = completenessChecker.getCompleteness(factDataCompletenessTag,
- from, to, measureTag);
- if (completenessMap != null && !completenessMap.isEmpty()) {
- for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
- String tag = measureCompleteness.getKey();
- for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
- if (completenessResult.getValue() < completenessThreshold) {
- log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
- completenessResult.getValue(), completenessThreshold,
- formatter.format(completenessResult.getKey()));
- String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
- Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
- if (incompletePartition == null) {
- incompletePartition = new HashMap<>();
- incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
- }
- incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
- isFactDataIncomplete = true;
- }
- }
- }
- }
- }
- if (isFactDataIncomplete) {
- log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact,
- incompleteMeasureData, cubeql.getTimeRanges());
- if (failOnPartialData) {
- i.remove();
- cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData));
- } else {
- cFact.setDataCompletenessMap(incompleteMeasureData);
- }
- }
- }
- }
-
void addNonExistingParts(String name, Set<String> nonExistingParts) {
nonExistingPartitions.put(name, nonExistingParts);
}
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range,
- Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions) throws LensException {
- try {
- return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses,
- missingPartitions);
- } catch (Exception e) {
- throw new LensException(e);
- }
- }
-
- private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- Set<FactPartition> partitions = new TreeSet<>();
- if (range != null && range.isCoverableBy(updatePeriods)
- && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) {
- return partitions;
- } else {
- return new TreeSet<>();
- }
- }
-
- private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol,
- Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods,
- boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
- PartitionRangesForPartitionColumns missingPartitions)
- throws Exception {
- log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate);
- if (fromDate.equals(toDate) || fromDate.after(toDate)) {
- return true;
- }
- UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
- if (interval == null) {
- log.info("No max interval for range: {} to {}", fromDate, toDate);
- return false;
- }
- log.debug("Max interval for {} is: {}", fact, interval);
- Set<String> storageTbls = new LinkedHashSet<String>();
- storageTbls.addAll(validStorageMap.get(fact).get(interval));
-
- if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
- for (String storageTbl : storageTbls) {
- FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
- partitions.add(part);
- part.getStorageTables().add(storageTbl);
- log.info("Added continuous fact partition for storage table {}", storageTbl);
- }
- return true;
- }
-
- Iterator<String> it = storageTbls.iterator();
- while (it.hasNext()) {
- String storageTableName = it.next();
- if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) {
- skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE));
- it.remove();
- } else if (!client.partColExists(storageTableName, partCol)) {
- log.info("{} does not exist in {}", partCol, storageTableName);
- skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol));
- it.remove();
- }
- }
-
- if (storageTbls.isEmpty()) {
- return false;
- }
- Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
- Date floorToDate = DateUtil.getFloorDate(toDate, interval);
-
- int lookAheadNumParts =
- conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
-
- TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
- // add partitions from ceilFrom to floorTo
- while (iter.hasNext()) {
- Date dt = iter.next();
- Date nextDt = iter.peekNext();
- FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
- log.debug("candidate storage tables for searching partitions: {}", storageTbls);
- updateFactPartitionStorageTablesFrom(fact, part, storageTbls);
- log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
- if (part.isFound()) {
- log.debug("Adding existing partition {}", part);
- partitions.add(part);
- log.debug("Looking for look ahead process time partitions for {}", part);
- if (processTimePartCol == null) {
- log.debug("processTimePartCol is null");
- } else if (partCol.equals(processTimePartCol)) {
- log.debug("part column is process time col");
- } else if (updatePeriods.first().equals(interval)) {
- log.debug("Update period is the least update period");
- } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
- // see if this is the part of the last-n look ahead partitions
- log.debug("Not a look ahead partition");
- } else {
- log.debug("Looking for look ahead process time partitions for {}", part);
- // check if finer partitions are required
- // final partitions are required if no partitions from
- // look-ahead
- // process time are present
- TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts,
- interval, 1).iterator();
- while (processTimeIter.hasNext()) {
- Date pdt = processTimeIter.next();
- Date nextPdt = processTimeIter.peekNext();
- FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, processTimePartition,
- part.getStorageTables());
- if (processTimePartition.isFound()) {
- log.debug("Finer parts not required for look-ahead partition :{}", part);
- } else {
- log.debug("Looked ahead process time partition {} is not found", processTimePartition);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- log.debug("newset of update periods:{}", newset);
- if (!newset.isEmpty()) {
- // Get partitions for look ahead process time
- log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
- Set<FactPartition> processTimeParts =
- getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(
- processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions);
- log.debug("Look ahead partitions: {}", processTimeParts);
- TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
- for (FactPartition pPart : processTimeParts) {
- log.debug("Looking for finer partitions in pPart: {}", pPart);
- for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
- FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
- partWhereClauseFormat);
- updateFactPartitionStorageTablesFrom(fact, innerPart, pPart);
- if (innerPart.isFound()) {
- partitions.add(innerPart);
- }
- }
- log.debug("added all sub partitions blindly in pPart: {}", pPart);
- }
- }
- }
- }
- }
- } else {
- log.info("Partition:{} does not exist in any storage table", part);
- TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
- newset.addAll(updatePeriods);
- newset.remove(interval);
- if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses,
- missingPartitions)) {
-
- log.debug("Adding non existing partition {}", part);
- if (addNonExistingParts) {
- // Add non existing partitions for all cases of whether we populate all non existing or not.
- missingPartitions.add(part);
- if (!failOnPartialData) {
- Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls);
- if (st.isEmpty()) {
- log.info("No eligible storage tables");
- return false;
- }
- partitions.add(part);
- part.getStorageTables().addAll(st);
- }
- } else {
- log.info("No finer granual partitions exist for {}", part);
- return false;
- }
- } else {
- log.debug("Finer granual partitions added for {}", part);
- }
- }
- }
- return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)
- && getPartitions(fact, floorToDate, toDate, partCol, partitions,
- updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions);
- }
-
- private Set<String> getStorageTablesWithoutPartCheck(FactPartition part,
- Set<String> storageTableNames) throws LensException, HiveException {
+ private Set<String> getStorageTablesWithoutPartCheck(FactPartition part, Set<String> storageTableNames)
+ throws LensException, HiveException {
Set<String> validStorageTbls = new HashSet<>();
for (String storageTableName : storageTableNames) {
// skip all storage tables for which are not eligible for this partition
@@ -843,21 +372,19 @@ class StorageTableResolver implements ContextRewriter {
return validStorageTbls;
}
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException {
- for (String storageTableName : storageTableNames) {
- // skip all storage tables for which are not eligible for this partition
- if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())
- && (client.factPartitionExists(fact, part, storageTableName))) {
- part.getStorageTables().add(storageTableName);
- part.setFound(true);
- }
+ enum PHASE {
+ STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
+
+ static PHASE first() {
+ return values()[0];
+ }
+
+ static PHASE last() {
+ return values()[values().length - 1];
}
- }
- private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
- FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException {
- updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables());
- part.setFound(part.isFound() && pPart.isFound());
+ PHASE next() {
+ return values()[(this.ordinal() + 1) % values().length];
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
index f9636d1..4f5d405 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,13 +18,19 @@
*/
package org.apache.lens.cube.parse;
+import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
+
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.lens.cube.metadata.FactPartition;
-import org.apache.lens.cube.metadata.StorageConstants;
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Lists;
+
public final class StorageUtil {
private StorageUtil() {
@@ -69,8 +75,8 @@ public final class StorageUtil {
String sep = "";
for (String timePartCol : timedDimensions) {
if (!timePartCol.equals(partCol)) {
- sb.append(sep).append(alias).append(".").append(timePartCol)
- .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
+ sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '")
+ .append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
sep = " AND ";
}
}
@@ -82,15 +88,11 @@ public final class StorageUtil {
String sep = "((";
for (String clause : clauses) {
if (clause != null && !clause.isEmpty()) {
- sb
- .append(sep)
- .append(clause);
+ sb.append(sep).append(clause);
sep = ") AND (";
}
}
- return sb
- .append(sep.equals("((") ? "" : "))")
- .toString();
+ return sb.append(sep.equals("((") ? "" : "))").toString();
}
/**
@@ -161,4 +163,108 @@ public final class StorageUtil {
return null;
}
}
+
+ /**
+ * Get fallback range
+ * @param range
+ * @param factName
+ * @param cubeql
+ * @return
+ * @throws LensException
+ */
+ public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql)
+ throws LensException {
+ Cube baseCube = cubeql.getBaseCube();
+ ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName());
+ if (!cubeql.getCube().getName().equals(baseCube.getName())) {
+ tableNames.add(baseCube.getName());
+ }
+ String fallBackString = null;
+ String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
+ for (String tableName : tableNames) {
+ fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
+ .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
+ if (StringUtils.isNotBlank(fallBackString)) {
+ break;
+ }
+ }
+ if (StringUtils.isBlank(fallBackString)) {
+ return null;
+ }
+ Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
+ if (!matcher.matches()) {
+ return null;
+ }
+ DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
+ DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
+ String relatedTimeDim = matcher.group(1).trim();
+ String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
+ return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
+ .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build();
+ }
+
+ /**
+ * Checks how much data is completed for a column.
+ * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker}
+ * @param cubeql
+ * @param cubeCol
+ * @param alias
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ * @return
+ */
+ public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
+ Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) {
+ CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
+ if (column != null && column.getTags() != null) {
+ String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
+ //Checking if dataCompletenessTag is set for queried measure
+ if (dataCompletenessTag != null) {
+ measureTag.add(dataCompletenessTag);
+ String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
+ if (value == null) {
+ tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+ } else {
+ value = value.concat(",").concat(alias);
+ tagToMeasureOrExprMap.put(dataCompletenessTag, value);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Extract the expression for the measure.
+ * @param cubeql
+ * @param measureTag
+ * @param tagToMeasureOrExprMap
+ */
+ public static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
+ Map<String, String> tagToMeasureOrExprMap) {
+ boolean isExprProcessed;
+ String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
+ for (String expr : cubeql.getQueriedExprsWithMeasures()) {
+ isExprProcessed = false;
+ for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
+ .getAllExprs()) {
+ if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
+ for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
+ if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
+ /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
+ So, even if the expression is composed of measures with different dataCompletenessTags, we will be
+ determining the dataCompleteness from one of the measure and this expression is grouped with the
+ other queried measures that have the same dataCompletenessTag. */
+ isExprProcessed = true;
+ break;
+ }
+ }
+ }
+ if (isExprProcessed) {
+ break;
+ }
+ }
+ }
+ }
}
+
[3/3] lens git commit: Initial chnages for Union
Posted by pu...@apache.org.
Initial chnages for Union
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/b6f0cc3d
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/b6f0cc3d
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/b6f0cc3d
Branch: refs/heads/lens-1381
Commit: b6f0cc3d4d55342e811c55c06ad2c10b62a0feb9
Parents: fe66131
Author: Puneet Gupta,Sushil Mohanty and Lavkesh Lahngir <pu...@apache.org>
Authored: Wed Dec 21 16:58:23 2016 +0530
Committer: Puneet <pu...@inmobi.com>
Committed: Wed Dec 21 17:05:41 2016 +0530
----------------------------------------------------------------------
.../lens/cube/metadata/FactPartition.java | 2 +
.../lens/cube/parse/AggregateResolver.java | 21 +-
.../org/apache/lens/cube/parse/Candidate.java | 139 ++++
.../parse/CandidateCoveringSetsResolver.java | 259 ++++++
.../apache/lens/cube/parse/CandidateDim.java | 13 +-
.../apache/lens/cube/parse/CandidateFact.java | 11 +
.../apache/lens/cube/parse/CandidateTable.java | 7 +-
.../cube/parse/CandidateTablePruneCause.java | 60 +-
.../lens/cube/parse/CandidateTableResolver.java | 238 ++----
.../apache/lens/cube/parse/CandidateUtil.java | 208 +++++
.../lens/cube/parse/CubeQueryContext.java | 63 +-
.../lens/cube/parse/CubeQueryRewriter.java | 8 +
.../cube/parse/DenormalizationResolver.java | 27 +-
.../lens/cube/parse/ExpressionResolver.java | 36 +-
.../apache/lens/cube/parse/JoinCandidate.java | 119 +++
.../apache/lens/cube/parse/JoinResolver.java | 14 +-
.../lens/cube/parse/LightestFactResolver.java | 28 +-
.../cube/parse/MaxCoveringFactResolver.java | 4 +
.../org/apache/lens/cube/parse/PruneCauses.java | 6 +-
.../lens/cube/parse/QueriedPhraseContext.java | 58 +-
.../org/apache/lens/cube/parse/QueryAST.java | 2 +
.../lens/cube/parse/StorageCandidate.java | 560 +++++++++++++
.../lens/cube/parse/StorageTableResolver.java | 793 ++++---------------
.../org/apache/lens/cube/parse/StorageUtil.java | 128 ++-
.../lens/cube/parse/TimeRangeChecker.java | 23 +-
.../apache/lens/cube/parse/UnionCandidate.java | 247 ++++++
.../lens/cube/parse/UnionQueryWriter.java | 33 +
.../lens/cube/parse/join/AutoJoinContext.java | 12 +
.../apache/lens/driver/cube/RewriterPlan.java | 22 +-
.../apache/lens/cube/parse/CubeTestSetup.java | 116 +++
.../cube/parse/TestUnionAndJoinCandidates.java | 65 ++
31 files changed, 2385 insertions(+), 937 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
index 1694b80..6a8e0c1 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
@@ -39,6 +39,8 @@ public class FactPartition implements Comparable<FactPartition> {
private final Set<String> storageTables = new LinkedHashSet<String>();
@Getter
private final UpdatePeriod period;
+
+ //TODO union : this is never set . Do we need this ?s
@Getter
@Setter
private FactPartition containingPart;
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/AggregateResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/AggregateResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/AggregateResolver.java
index 9658100..79f38da 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/AggregateResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/AggregateResolver.java
@@ -71,21 +71,24 @@ class AggregateResolver implements ContextRewriter {
|| hasMeasuresNotInDefaultAggregates(cubeql, cubeql.getHavingAST(), null, aggregateResolverDisabled)
|| hasMeasures(cubeql, cubeql.getWhereAST()) || hasMeasures(cubeql, cubeql.getGroupByAST())
|| hasMeasures(cubeql, cubeql.getOrderByAST())) {
- Iterator<CandidateFact> factItr = cubeql.getCandidateFacts().iterator();
- while (factItr.hasNext()) {
- CandidateFact candidate = factItr.next();
- if (candidate.fact.isAggregated()) {
- cubeql.addFactPruningMsgs(candidate.fact,
- CandidateTablePruneCause.missingDefaultAggregate());
- factItr.remove();
+ //TODO union : Note : Pending : cube segmentation design may change the above assumption and Set<Candidate> can contain and mix of StorageCandidate and UnionSegmentCandidate. This step can then ignore UnionSegmentCandidate
+ Iterator<Candidate> candItr = cubeql.getCandidates().iterator();
+ while (candItr.hasNext()) {
+ Candidate candidate = candItr.next();
+ if (candidate instanceof StorageCandidate) {
+ StorageCandidate sc = (StorageCandidate) candidate;
+ if (sc.getFact().isAggregated()) {
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.missingDefaultAggregate());
+ candItr.remove();
+ }
+ } else {
+ throw new LensException("Not a storage candidate!!");
}
}
nonDefaultAggregates = true;
log.info("Query has non default aggregates, no aggregate resolution will be done");
}
- cubeql.pruneCandidateFactSet(CandidateTablePruneCode.MISSING_DEFAULT_AGGREGATE);
-
if (nonDefaultAggregates || aggregateResolverDisabled) {
return;
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java
new file mode 100644
index 0000000..0d0ddb7
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java
@@ -0,0 +1,139 @@
+package org.apache.lens.cube.parse;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lens.cube.metadata.Dimension;
+import org.apache.lens.cube.metadata.FactPartition;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+/**
+ * This interface represents candidates that are involved in different phases of query rewriting.
+ * At the lowest level, Candidate is represented by a StorageCandidate that has a fact on a storage
+ * and other joined dimensions (if any) that are required to answer the query or part of the query.
+ * At a higher level Candidate can also be a Join or a Union Candidate representing join or union
+ * between other candidates
+ *
+ * Different Re-writers will work on applicable candidates to produce a final candidate which will be used
+ * for generating the re-written query.
+ */
+public interface Candidate {
+
+ /**
+ * Returns String representation of this Candidate
+ * TODO decide if this method should be moved to QueryAST instead
+ *
+ * @return
+ */
+ String toHQL();
+
+ /**
+ * Returns Query AST
+ *
+ * @return
+ */
+ QueryAST getQueryAst();
+
+ /**
+ * Returns all the fact columns
+ *
+ * @return
+ */
+ Collection<String> getColumns();
+
+ /**
+ * Start Time for this candidate (calculated based on schema)
+ *
+ * @return
+ */
+ Date getStartTime();
+
+ /**
+ * End Time for this candidate (calculated based on schema)
+ *
+ * @return
+ */
+ Date getEndTime();
+
+ /**
+ * Returns the cost of this candidate
+ *
+ * @return
+ */
+ double getCost();
+
+ /**
+ * Alias used for this candidate.
+ *
+ * @return
+ */
+ String getAlias();
+
+ /**
+ * Returns true if this candidate contains the given candidate
+ *
+ * @param candidate
+ * @return
+ */
+ boolean contains(Candidate candidate);
+
+ /**
+ * Returns child candidates of this candidate if any.
+ * Note: StorageCandidate will return null
+ * @return
+ */
+ Collection<Candidate> getChildren();
+
+
+ /**
+ * Calculates if this candidate can answer the query for given time range based on actual data registered with
+ * the underlying candidate storages. This method will also update any internal candidate data structures that are
+ * required for writing the re-written query and to answer {@link #getParticipatingPartitions()}.
+ *
+ * @param timeRange : TimeRange to check completeness for. TimeRange consists of start time, end time and the
+ * partition column
+ * @param failOnPartialData : fail fast if the candidate can answer the query only partially
+ * @return true if this Candidate can answer query for the given time range.
+ */
+ boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData)
+ throws LensException;
+
+ /**
+ * Returns the set of fact partitions that will participate in this candidate.
+ * Note: This method can be called only after call to
+ * {@link #evaluateCompleteness(TimeRange, boolean)}
+ *
+ * @return
+ */
+ Set<FactPartition> getParticipatingPartitions();
+
+ /**
+ * TODO union: in case of join , one of the candidates should be able to answer the mesaure expression
+ * TODO union: In case of union, all the candidates should answer the expression
+ * TODO union : add isExpresionEvaluable() to Candidate
+ *
+ * @param expr
+ * @return
+ */
+ boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr);
+
+ // Moved to CandidateUtil boolean isValidForTimeRange(TimeRange timeRange);
+ // Moved to CandidateUtil boolean isExpressionAnswerable(ASTNode node, CubeQueryContext context) throws LensException;
+ // NO caller Set<String> getTimePartCols(CubeQueryContext query) throws LensException;
+
+ //TODO add methods to update AST in this candidate in this class of in CandidateUtil.
+ //void updateFromString(CubeQueryContext query) throws LensException;
+
+ //void updateASTs(CubeQueryContext cubeql) throws LensException;
+
+ //void addToHaving(ASTNode ast) throws LensException;
+
+ //Used Having push down flow
+ //String addAndGetAliasFromSelect(ASTNode ast, AliasDecider aliasDecider);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
new file mode 100644
index 0000000..e961427
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
@@ -0,0 +1,259 @@
+package org.apache.lens.cube.parse;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lens.cube.error.LensCubeErrorCode;
+import org.apache.lens.cube.metadata.TimeRange;
+
+import org.apache.lens.server.api.error.LensException;
+
+import java.util.*;
+
+@Slf4j
+public class CandidateCoveringSetsResolver implements ContextRewriter {
+
+ private List<Candidate> finalCandidates = new ArrayList<>();
+ private int unionCandidatealiasCounter = 0;
+ private int joinCandidatealiasCounter = 0;
+
+ public CandidateCoveringSetsResolver(Configuration conf) {
+ }
+
+ @Override
+ public void rewriteContext(CubeQueryContext cubeql) throws LensException {
+
+ Set<QueriedPhraseContext> queriedMsrs = new HashSet<>();
+ for (QueriedPhraseContext qur : cubeql.getQueriedPhrases()) {
+ if (qur.hasMeasures(cubeql)) {
+ queriedMsrs.add(qur);
+ }
+ }
+ // if no measures are queried, add all StorageCandidates individually as single covering sets
+ if (queriedMsrs.isEmpty()) {
+ finalCandidates.addAll(cubeql.getCandidates());
+ }
+
+ List<Candidate> unionSet = resolveRangeCoveringFactSet(cubeql, cubeql.getTimeRanges(), queriedMsrs);
+ List<List<Candidate>> measureCoveringSets = resolveJoinCandidates(unionSet, queriedMsrs, cubeql);
+ updateFinalCandidates(measureCoveringSets);
+ log.info("Covering candidate sets :{}", finalCandidates);
+
+ String msrString = CandidateUtil.getColumns(queriedMsrs).toString();
+ if (finalCandidates.isEmpty()) {
+ throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(), msrString);
+ }
+ // update final candidate sets
+ cubeql.getCandidates().clear();
+ cubeql.getCandidates().addAll(finalCandidates);
+ // TODO : we might need to prune if we maintian two data structures in CubeQueryContext.
+ //cubeql.pruneCandidateFactWithCandidateSet(CandidateTablePruneCause.columnNotFound(getColumns(queriedMsrs)));
+ //if (cubeql.getCandidates().size() == 0) {
+ // throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(), msrString);
+ // }
+ }
+
+ private Candidate createJoinCandidateFromUnionCandidates(List<Candidate> ucs) {
+ Candidate cand;
+ if (ucs.size() >= 2) {
+ Candidate first = ucs.get(0);
+ Candidate second = ucs.get(1);
+ cand = new JoinCandidate(first, second, "jc" + joinCandidatealiasCounter++);
+ for (int i = 2; i < ucs.size(); i++) {
+ cand = new JoinCandidate(cand, ucs.get(i), "jc" + joinCandidatealiasCounter++);
+ }
+ } else {
+ cand = ucs.get(0);
+ }
+ return cand;
+ }
+
+ private void updateFinalCandidates(List<List<Candidate>> jcs) {
+ int aliasCounter = 0;
+ for (Iterator<List<Candidate>> itr = jcs.iterator(); itr.hasNext(); ) {
+ List<Candidate> jc = itr.next();
+ if (jc.size() == 1 && jc.iterator().next().getChildren().size() == 1) {
+ finalCandidates.add(jc.iterator().next().getChildren().iterator().next());
+ } else {
+ finalCandidates.add(createJoinCandidateFromUnionCandidates(jc));
+ }
+ }
+ }
+
+ private boolean isCandidateCoveringTimeRanges(UnionCandidate uc, List<TimeRange> ranges) {
+ for (Iterator<TimeRange> itr = ranges.iterator(); itr.hasNext(); ) {
+ TimeRange range = itr.next();
+ if (!CandidateUtil.isTimeRangeCovered(uc.getChildren(), range.getFromDate(), range.getToDate())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void pruneUnionCandidatesNotCoveringAllRanges(List<UnionCandidate> ucs, List<TimeRange> ranges) {
+ for (Iterator<UnionCandidate> itr = ucs.iterator(); itr.hasNext(); ) {
+ UnionCandidate uc = itr.next();
+ if (!isCandidateCoveringTimeRanges(uc, ranges)) {
+ itr.remove();
+ }
+ }
+ }
+
+ private List<Candidate> resolveRangeCoveringFactSet(CubeQueryContext cubeql, List<TimeRange> ranges,
+ Set<QueriedPhraseContext> queriedMsrs) throws LensException {
+ // All Candidates
+ List<Candidate> allCandidates = new ArrayList<Candidate>(cubeql.getCandidates());
+ // Partially valid candidates
+ List<Candidate> allCandidatesPartiallyValid = new ArrayList<>();
+ List<Candidate> candidateSet = new ArrayList<>();
+ for (Candidate cand : allCandidates) {
+ // Assuming initial list of candidates populated are StorageCandidate
+ if (cand instanceof StorageCandidate) {
+ StorageCandidate sc = (StorageCandidate) cand;
+ if (CandidateUtil.isValidForTimeRanges(sc, ranges)) {
+ candidateSet.add(sc);
+ continue;
+ } else if (CandidateUtil.isPartiallyValidForTimeRanges(sc, ranges)) {
+ allCandidatesPartiallyValid.add(CandidateUtil.cloneStorageCandidate(sc));
+ }
+ } else {
+ throw new LensException("Not a StorageCandidate!!");
+ }
+ }
+ // Get all covering fact sets
+ List<UnionCandidate> unionCoveringSet =
+ getCombinations(new ArrayList<Candidate>(allCandidatesPartiallyValid));
+ // Sort the Collection based on no of elements
+ Collections.sort(unionCoveringSet, new CandidateUtil.UnionCandidateComparator<UnionCandidate>());
+ // prune non covering sets
+ pruneUnionCandidatesNotCoveringAllRanges(unionCoveringSet, ranges);
+ // prune candidate set which doesn't contain any common measure i
+ pruneUnionCoveringSetWithoutAnyCommonMeasure(unionCoveringSet, queriedMsrs, cubeql);
+ // prune redundant covering sets
+ pruneRedundantUnionCoveringSets(unionCoveringSet);
+ // pruing done in the previous steps, now create union candidates
+ candidateSet.addAll(unionCoveringSet);
+ return candidateSet ;
+
+ }
+
+ private boolean isMeasureAnswerablebyUnionCandidate(QueriedPhraseContext msr, Candidate uc,
+ CubeQueryContext cubeql) throws LensException {
+ // Candidate is a single StorageCandidate
+ if (uc.getChildren() == null ) {
+ if (!msr.isEvaluable(cubeql, (StorageCandidate) uc)) {
+ return false;
+ }
+ } else {
+ for (Candidate cand : uc.getChildren()) {
+ if (!msr.isEvaluable(cubeql, (StorageCandidate) cand)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void pruneUnionCoveringSetWithoutAnyCommonMeasure(List<UnionCandidate> ucs,
+ Set<QueriedPhraseContext> queriedMsrs,
+ CubeQueryContext cubeql) throws LensException {
+ for (ListIterator<UnionCandidate> itr = ucs.listIterator(); itr.hasNext(); ) {
+ boolean toRemove = true;
+ UnionCandidate uc = itr.next();
+ for (QueriedPhraseContext msr : queriedMsrs) {
+ if (isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql)) {
+ toRemove = false;
+ break;
+ }
+ }
+ if (toRemove) {
+ itr.remove();
+ }
+ }
+ }
+
+ private void pruneRedundantUnionCoveringSets(List<UnionCandidate> candidates) {
+ for (int i = 0; i < candidates.size(); i++) {
+ UnionCandidate current = candidates.get(i);
+ int j = i + 1;
+ for (ListIterator<UnionCandidate> itr = candidates.listIterator(j); itr.hasNext(); ) {
+ UnionCandidate next = itr.next();
+ if (next.getChildren().containsAll(current.getChildren())) {
+ itr.remove();
+ }
+ }
+ }
+ }
+
+ public List<UnionCandidate> getCombinations(final List<Candidate> candidates) {
+ int aliasCounter = 0;
+ List<UnionCandidate> combinations = new LinkedList<UnionCandidate>();
+ int size = candidates.size();
+ int threshold = Double.valueOf(Math.pow(2, size)).intValue() - 1;
+
+ for (int i = 1; i <= threshold; ++i) {
+ LinkedList<Candidate> individualCombinationList = new LinkedList<Candidate>();
+ int count = size - 1;
+ int clonedI = i;
+ while (count >= 0) {
+ if ((clonedI & 1) != 0) {
+ individualCombinationList.addFirst(candidates.get(count));
+ }
+ clonedI = clonedI >>> 1;
+ --count;
+ }
+ combinations.add(new UnionCandidate(individualCombinationList, "uc" + unionCandidatealiasCounter++ ));
+ }
+ return combinations;
+ }
+
+ private List<List<Candidate>> resolveJoinCandidates(List<Candidate> unionCandidates,
+ Set<QueriedPhraseContext> msrs,
+ CubeQueryContext cubeql) throws LensException {
+ List<List<Candidate>> msrCoveringSets = new ArrayList<>();
+ List<Candidate> ucSet = new ArrayList<>(unionCandidates);
+ boolean evaluable = false;
+ // Check if a single set can answer all the measures and exprsWithMeasures
+ for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext(); ) {
+ Candidate uc = i.next();
+ for (QueriedPhraseContext msr : msrs) {
+ evaluable = isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql) ? true : false;
+ if (!evaluable) {
+ break;
+ }
+ }
+ if (evaluable) {
+ // single set can answer all the measures as an UnionCandidate
+ List<Candidate> one = new ArrayList<>();
+ one.add(uc);
+ msrCoveringSets.add(one);
+ i.remove();
+ }
+ }
+ // Sets that contain all measures or no measures are removed from iteration.
+ // find other facts
+ for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext(); ) {
+ Candidate uc = i.next();
+ i.remove();
+ // find the remaining measures in other facts
+ if (i.hasNext()) {
+ Set<QueriedPhraseContext> remainingMsrs = new HashSet<>(msrs);
+ Set<QueriedPhraseContext> coveredMsrs = CandidateUtil.coveredMeasures(uc, msrs, cubeql);
+ remainingMsrs.removeAll(coveredMsrs);
+
+ List<List<Candidate>> coveringSets = resolveJoinCandidates(ucSet, remainingMsrs, cubeql);
+ if (!coveringSets.isEmpty()) {
+ for (List<Candidate> candSet : coveringSets) {
+ candSet.add(uc);
+ msrCoveringSets.add(candSet);
+ }
+ } else {
+ log.info("Couldnt find any set containing remaining measures:{} {} in {}", remainingMsrs,
+ ucSet);
+ }
+ }
+ }
+ log.info("Covering set {} for measures {} with factsPassed {}", msrCoveringSets, msrs, ucSet);
+ return msrCoveringSets;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateDim.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateDim.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateDim.java
index 4dcdbcf..0dde72d 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateDim.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateDim.java
@@ -38,7 +38,7 @@ public class CandidateDim implements CandidateTable {
final CubeDimensionTable dimtable;
@Getter
@Setter
- private String storageTable;
+ private String storageName;
@Getter
@Setter
private String whereClause;
@@ -73,11 +73,11 @@ public class CandidateDim implements CandidateTable {
String database = SessionState.get().getCurrentDatabase();
// Add database name prefix for non default database
if (StringUtils.isNotBlank(database) && !"default".equalsIgnoreCase(database)) {
- storageTable = database + "." + storageTable;
+ storageName = database + "." + storageName;
}
dbResolved = true;
}
- return storageTable + " " + alias;
+ return storageName + " " + alias;
}
@Override
@@ -124,12 +124,7 @@ public class CandidateDim implements CandidateTable {
}
@Override
- public Set<String> getStorageTables() {
- return Collections.singleton(storageTable);
- }
-
- @Override
- public Set<String> getPartsQueried() {
+ public Set<String> getParticipatingPartitions() {
if (StringUtils.isBlank(whereClause)) {
return Collections.emptySet();
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateFact.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateFact.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateFact.java
index b42262d..18478f8 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateFact.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateFact.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Sets;
import lombok.Getter;
import lombok.Setter;
+//TODO union : delete this class and use Candidate and StorageCandidtae
/**
* Holds context of a candidate fact table.
*/
@@ -110,6 +111,11 @@ public class CandidateFact implements CandidateTable, QueryAST {
return columns;
}
+ @Override
+ public Set<?> getParticipatingPartitions() {
+ return null;
+ }
+
public boolean isValidForTimeRange(TimeRange timeRange) {
return (!timeRange.getFromDate().before(fact.getStartTime())) && (!timeRange.getToDate().after(fact.getEndTime()));
}
@@ -241,6 +247,11 @@ public class CandidateFact implements CandidateTable, QueryAST {
return StringUtils.join(storageTables, ",") + " " + alias;
}
+ @Override
+ public String getStorageName() {
+ return null;
+ }
+
public void setStorageTables(Set<String> storageTables) {
String database = SessionState.get().getCurrentDatabase();
// Add database name prefix for non default database
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
index e001ca4..5863c1c 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
@@ -37,10 +37,10 @@ public interface CandidateTable {
String getStorageString(String alias);
/**
- * Get storage tables corresponding to this candidate
+ * Get storage table corresponding to this candidate
* @return
*/
- Set<String> getStorageTables();
+ String getStorageName();
/**
* Get candidate table
@@ -73,5 +73,6 @@ public interface CandidateTable {
/**
* Get partitions queried
*/
- Set<?> getPartsQueried();
+ //TODO union: Name changed
+ Set<?> getParticipatingPartitions();
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
index 2ad6e20..41814f0 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
@@ -38,6 +38,7 @@ import lombok.NoArgsConstructor;
@JsonWriteNullProperties(false)
@Data
@NoArgsConstructor
+//TODO union: Since we are working on StoargeCandidates now, we might need some chnages here
public class CandidateTablePruneCause {
public enum CandidateTablePruneCode {
@@ -158,8 +159,25 @@ public class CandidateTablePruneCause {
}
return new String[]{incompletePartitions.toString()};
}
- };
+ },
+ // Moved from Stoarge causes
+ INVALID_STORAGE("Invalid Storage"),
+ // storage table does not exist
+ STOARGE_TABLE_DOES_NOT_EXIST("Storage table does not exist"),
+ // storage has no update periods queried
+ MISSING_UPDATE_PERIODS("Storage has no update periods"),
+ // no candidate update periods, update period cause will have why each
+ // update period is not a candidate
+ NO_CANDIDATE_UPDATE_PERIODS("Storage update periods are not candidate"),
+ // storage table has no partitions queried
+ NO_PARTITIONS("Storage table has no partitions"),
+ // partition column does not exist
+ PART_COL_DOES_NOT_EXIST("Partition column does not exist"),
+ // Range is not supported by this storage table
+ TIME_RANGE_NOT_ANSWERABLE("Range not answerable"),
+ // storage is not supported by execution engine
+ UNSUPPORTED_STORAGE("Unsupported Storage");
String errorFormat;
@@ -180,6 +198,8 @@ public class CandidateTablePruneCause {
}
}
+ //TODO union : Remove this enum. All values moved to CandidateTablePruneCode
+ @Deprecated
public enum SkipStorageCode {
// invalid storage table
INVALID,
@@ -210,16 +230,21 @@ public class CandidateTablePruneCause {
@JsonWriteNullProperties(false)
@Data
@NoArgsConstructor
+ //TODO union:deprecate this sub class
+ @Deprecated
public static class SkipStorageCause {
private SkipStorageCode cause;
// update period to skip cause
private Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause;
+
private List<String> nonExistantPartCols;
+ @Deprecated
public SkipStorageCause(SkipStorageCode cause) {
this.cause = cause;
}
+ @Deprecated
public static SkipStorageCause partColDoesNotExist(String... partCols) {
SkipStorageCause ret = new SkipStorageCause(SkipStorageCode.PART_COL_DOES_NOT_EXIST);
ret.nonExistantPartCols = new ArrayList<String>();
@@ -229,6 +254,7 @@ public class CandidateTablePruneCause {
return ret;
}
+ @Deprecated
public static SkipStorageCause noCandidateUpdatePeriod(Map<String, SkipUpdatePeriodCode> causes) {
SkipStorageCause ret = new SkipStorageCause(SkipStorageCode.NO_CANDIDATE_PERIODS);
ret.updatePeriodRejectionCause = causes;
@@ -263,6 +289,11 @@ public class CandidateTablePruneCause {
// ranges in which fact is invalid
private List<TimeRange> invalidRanges;
+ private List<String> nonExistantPartCols;
+
+ private Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause;
+
+
public CandidateTablePruneCause(CandidateTablePruneCode cause) {
this.cause = cause;
}
@@ -338,7 +369,9 @@ public class CandidateTablePruneCause {
return cause;
}
- public static CandidateTablePruneCause noCandidateStorages(Map<String, SkipStorageCause> storageCauses) {
+ //TDOO union : Remove this method
+ @Deprecated
+ public static CandidateTablePruneCause noCandidateStorages(Map<String, SkipStorageCause> storageCauses) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_STORAGES);
cause.setStorageCauses(new HashMap<String, SkipStorageCause>());
for (Map.Entry<String, SkipStorageCause> entry : storageCauses.entrySet()) {
@@ -354,4 +387,27 @@ public class CandidateTablePruneCause {
cause.setColumnsMissingDefaultAggregate(Lists.newArrayList(names));
return cause;
}
+
+ /**
+ * Queried partition columns are not present in this Storage Candidate
+ * @param missingPartitionColumns
+ * @return
+ */
+ public static CandidateTablePruneCause partitionColumnsMissing(final List<String> missingPartitionColumns) {
+ CandidateTablePruneCause cause = new CandidateTablePruneCause(PART_COL_DOES_NOT_EXIST);
+ cause.nonExistantPartCols = missingPartitionColumns;
+ return cause;
+ }
+
+ /**
+ * All update periods of this Stoarge Candidate are rejected.
+ * @param updatePeriodRejectionCause
+ * @return
+ */
+ public static CandidateTablePruneCause updatePeriodsRejected(
+ final Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause) {
+ CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_UPDATE_PERIODS);
+ cause.updatePeriodRejectionCause = updatePeriodRejectionCause;
+ return cause;
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java
index e7fc557..dd098b1 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java
@@ -33,7 +33,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Sets;
-
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -59,6 +58,7 @@ class CandidateTableResolver implements ContextRewriter {
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
if (checkForQueriedColumns) {
log.debug("Dump queried columns:{}", cubeql.getTblAliasToColumns());
+ //TODO union : create StoargeCandidate s now in populateCandidateTables
populateCandidateTables(cubeql);
resolveCandidateFactTables(cubeql);
resolveCandidateDimTables(cubeql);
@@ -88,6 +88,7 @@ class CandidateTableResolver implements ContextRewriter {
}
private void populateCandidateTables(CubeQueryContext cubeql) throws LensException {
+ int aliasCounter = 0;
if (cubeql.getCube() != null) {
List<CubeFactTable> factTables = cubeql.getMetastoreClient().getAllFacts(cubeql.getCube());
if (factTables.isEmpty()) {
@@ -95,10 +96,11 @@ class CandidateTableResolver implements ContextRewriter {
cubeql.getCube().getName() + " does not have any facts");
}
for (CubeFactTable fact : factTables) {
- CandidateFact cfact = new CandidateFact(fact, cubeql.getCube());
- cubeql.getCandidateFacts().add(cfact);
+ StorageCandidate sc = new StorageCandidate(cubeql.getCube(), fact,
+ fact.getStorages().iterator().next(), "sc" + aliasCounter++, cubeql);
+ cubeql.getCandidates().add(sc);
}
- log.info("Populated candidate facts: {}", cubeql.getCandidateFacts());
+ log.info("Populated storage candidates: {}", cubeql.getCandidates());
}
if (cubeql.getDimensions().size() != 0) {
@@ -158,10 +160,10 @@ class CandidateTableResolver implements ContextRewriter {
OptionalDimCtx optdim = cubeql.getOptionalDimensionMap().remove(dim);
// remove all the depending candidate table as well
for (CandidateTable candidate : optdim.requiredForCandidates) {
- if (candidate instanceof CandidateFact) {
- log.info("Not considering fact:{} as refered table does not have any valid dimtables", candidate);
+ if (candidate instanceof StorageCandidate) {
+ log.info("Not considering storage candidate:{} as refered table does not have any valid dimtables", candidate);
cubeql.getCandidateFacts().remove(candidate);
- cubeql.addFactPruningMsgs(((CandidateFact) candidate).fact, new CandidateTablePruneCause(
+ cubeql.addStoragePruningMsg(((StorageCandidate) candidate), new CandidateTablePruneCause(
CandidateTablePruneCode.INVALID_DENORM_TABLE));
} else {
log.info("Not considering dimtable:{} as refered table does not have any valid dimtables", candidate);
@@ -176,20 +178,20 @@ class CandidateTableResolver implements ContextRewriter {
}
}
- public static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
+ private static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) {
return (isColumnAvailableFrom(range.getFromDate(), startTime)
&& isColumnAvailableTill(range.getToDate(), endTime));
}
- public static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
+ private static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) {
return (startTime == null) ? true : date.equals(startTime) || date.after(startTime);
}
- public static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
+ private static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) {
return (endTime == null) ? true : date.equals(endTime) || date.before(endTime);
}
- public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, CandidateTable cfact, String col) {
+ private static boolean isFactColumnValidForRange(CubeQueryContext cubeql, CandidateTable cfact, String col) {
for(TimeRange range : cubeql.getTimeRanges()) {
if (!isColumnAvailableInRange(range, getFactColumnStartTime(cfact, col), getFactColumnEndTime(cfact, col))) {
return false;
@@ -198,7 +200,7 @@ class CandidateTableResolver implements ContextRewriter {
return true;
}
- public static Date getFactColumnStartTime(CandidateTable table, String factCol) {
+ private static Date getFactColumnStartTime(CandidateTable table, String factCol) {
Date startTime = null;
if (table instanceof CandidateFact) {
for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
@@ -213,7 +215,7 @@ class CandidateTableResolver implements ContextRewriter {
return startTime;
}
- public static Date getFactColumnEndTime(CandidateTable table, String factCol) {
+ private static Date getFactColumnEndTime(CandidateTable table, String factCol) {
Date endTime = null;
if (table instanceof CandidateFact) {
for (String key : ((CandidateFact) table).fact.getProperties().keySet()) {
@@ -232,7 +234,7 @@ class CandidateTableResolver implements ContextRewriter {
if (cubeql.getCube() != null) {
String str = cubeql.getConf().get(CubeQueryConfUtil.getValidFactTablesKey(cubeql.getCube().getName()));
List<String> validFactTables =
- StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
+ StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
Set<QueriedPhraseContext> queriedMsrs = new HashSet<>();
Set<QueriedPhraseContext> dimExprs = new HashSet<>();
@@ -243,100 +245,75 @@ class CandidateTableResolver implements ContextRewriter {
dimExprs.add(qur);
}
}
- // Remove fact tables based on whether they are valid or not.
- for (Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); i.hasNext();) {
- CandidateFact cfact = i.next();
+ // Remove storage candidates based on whether they are valid or not.
+ for (Iterator<Candidate> i = cubeql.getCandidates().iterator(); i.hasNext(); ) {
+ Candidate cand = i.next();
+ if (cand instanceof StorageCandidate) {
+ StorageCandidate sc = (StorageCandidate) cand;
+ if (validFactTables != null) {
+ if (!validFactTables.contains(sc.getName().toLowerCase())) {
+ log.info("Not considering storage candidate:{} as it is not a valid candidate", sc);
+ cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID));
+ i.remove();
+ continue;
+ }
+ }
- if (validFactTables != null) {
- if (!validFactTables.contains(cfact.getName().toLowerCase())) {
- log.info("Not considering fact table:{} as it is not a valid fact", cfact);
- cubeql
- .addFactPruningMsgs(cfact.fact, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID));
- i.remove();
- continue;
+ // update expression evaluability for this fact
+ for (String expr : cubeql.getQueriedExprs()) {
+ cubeql.getExprCtx().updateEvaluables(expr, sc);
}
- }
- // update expression evaluability for this fact
- for (String expr : cubeql.getQueriedExprs()) {
- cubeql.getExprCtx().updateEvaluables(expr, cfact);
- }
+ // go over the columns accessed in the query and find out which tables
+ // can answer the query
+ // the candidate facts should have all the dimensions queried and
+ // atleast
+ // one measure
+ boolean toRemove = false;
+ for (QueriedPhraseContext qur : dimExprs) {
+ if (!qur.isEvaluable(cubeql, sc)) {
+ log.info("Not considering storage candidate:{} as columns {} are not available", sc, qur.getColumns());
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.columnNotFound(qur.getColumns()));
+ toRemove = true;
+ break;
+ }
+ }
- // go over the columns accessed in the query and find out which tables
- // can answer the query
- // the candidate facts should have all the dimensions queried and
- // atleast
- // one measure
- boolean toRemove = false;
- for (QueriedPhraseContext qur : dimExprs) {
- if (!qur.isEvaluable(cubeql, cfact)) {
- log.info("Not considering fact table:{} as columns {} are not available", cfact, qur.getColumns());
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.columnNotFound(qur.getColumns()));
+ // check if the candidate fact has atleast one measure queried
+ // if expression has measures, they should be considered along with other measures and see if the fact can be
+ // part of measure covering set
+ if (!checkForFactColumnExistsAndValidForRange(sc, queriedMsrs, cubeql)) {
+ Set<String> columns = getColumns(queriedMsrs);
+ log.info("Not considering storage candidate:{} as columns {} is not available", sc, columns);
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.columnNotFound(columns));
toRemove = true;
- break;
}
- }
-
- // check if the candidate fact has atleast one measure queried
- // if expression has measures, they should be considered along with other measures and see if the fact can be
- // part of measure covering set
- if (!checkForFactColumnExistsAndValidForRange(cfact, queriedMsrs, cubeql)) {
- Set<String> columns = getColumns(queriedMsrs);
- log.info("Not considering fact table:{} as columns {} is not available", cfact, columns);
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.columnNotFound(columns));
- toRemove = true;
- }
- // go over join chains and prune facts that dont have any of the columns in each chain
- for (JoinChain chain : cubeql.getJoinchains().values()) {
- OptionalDimCtx optdim = cubeql.getOptionalDimensionMap().get(Aliased.create((Dimension)cubeql.getCubeTbls()
- .get(chain.getName()), chain.getName()));
- if (!checkForFactColumnExistsAndValidForRange(cfact, chain.getSourceColumns(), cubeql)) {
- // check if chain is optional or not
- if (optdim == null) {
- log.info("Not considering fact table:{} as columns {} are not available", cfact,
- chain.getSourceColumns());
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.columnNotFound(chain.getSourceColumns()));
- toRemove = true;
- break;
+ // go over join chains and prune facts that dont have any of the columns in each chain
+ for (JoinChain chain : cubeql.getJoinchains().values()) {
+ OptionalDimCtx optdim = cubeql.getOptionalDimensionMap().get(Aliased.create((Dimension) cubeql.getCubeTbls()
+ .get(chain.getName()), chain.getName()));
+ if (!checkForFactColumnExistsAndValidForRange(sc, chain.getSourceColumns(), cubeql)) {
+ // check if chain is optional or not
+ if (optdim == null) {
+ log.info("Not considering storage candidate:{} as columns {} are not available", sc,
+ chain.getSourceColumns());
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.columnNotFound(chain.getSourceColumns()));
+ toRemove = true;
+ break;
+ }
}
}
- }
- if (toRemove) {
- i.remove();
- }
- }
- if (cubeql.getCandidateFacts().size() == 0) {
- throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(),
- getColumns(cubeql.getQueriedPhrases()).toString());
- }
- Set<Set<CandidateFact>> cfactset;
- if (queriedMsrs.isEmpty()) {
- // if no measures are queried, add all facts individually as single covering sets
- cfactset = new HashSet<>();
- for (CandidateFact cfact : cubeql.getCandidateFacts()) {
- Set<CandidateFact> one = new LinkedHashSet<>();
- one.add(cfact);
- cfactset.add(one);
- }
- cubeql.getCandidateFactSets().addAll(cfactset);
- } else {
- // Find out candidate fact table sets which contain all the measures
- // queried
-
- List<CandidateFact> cfacts = new ArrayList<>(cubeql.getCandidateFacts());
- cfactset = findCoveringSets(cubeql, cfacts, queriedMsrs);
- log.info("Measure covering fact sets :{}", cfactset);
- String msrString = getColumns(queriedMsrs).toString();
- if (cfactset.isEmpty()) {
- throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(), msrString);
+ if (toRemove) {
+ i.remove();
+ }
+ } else {
+ throw new LensException("Not a storage candidate!!");
}
- cubeql.getCandidateFactSets().addAll(cfactset);
- cubeql.pruneCandidateFactWithCandidateSet(CandidateTablePruneCause.columnNotFound(getColumns(queriedMsrs)));
-
- if (cubeql.getCandidateFacts().size() == 0) {
- throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(), msrString);
+ if (cubeql.getCandidates().size() == 0) {
+ throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(),
+ getColumns(cubeql.getQueriedPhrases()).toString());
}
}
}
@@ -349,51 +326,6 @@ class CandidateTableResolver implements ContextRewriter {
}
return cols;
}
- static Set<Set<CandidateFact>> findCoveringSets(CubeQueryContext cubeql, List<CandidateFact> cfactsPassed,
- Set<QueriedPhraseContext> msrs) throws LensException {
- Set<Set<CandidateFact>> cfactset = new HashSet<>();
- List<CandidateFact> cfacts = new ArrayList<>(cfactsPassed);
- for (Iterator<CandidateFact> i = cfacts.iterator(); i.hasNext();) {
- CandidateFact cfact = i.next();
- if (!checkForFactColumnExistsAndValidForRange(cfact, msrs, cubeql)) {
- // cfact does not contain any of msrs and none of exprsWithMeasures are evaluable.
- // ignore the fact
- i.remove();
- continue;
- } else if (allEvaluable(cfact, msrs, cubeql)) {
- // return single set
- Set<CandidateFact> one = new LinkedHashSet<>();
- one.add(cfact);
- cfactset.add(one);
- i.remove();
- }
- }
- // facts that contain all measures or no measures are removed from iteration.
- // find other facts
- for (Iterator<CandidateFact> i = cfacts.iterator(); i.hasNext();) {
- CandidateFact cfact = i.next();
- i.remove();
- // find the remaining measures in other facts
- if (i.hasNext()) {
- Set<QueriedPhraseContext> remainingMsrs = new HashSet<>(msrs);
- Set<QueriedPhraseContext> coveredMsrs = coveredMeasures(cfact, msrs, cubeql);
- remainingMsrs.removeAll(coveredMsrs);
-
- Set<Set<CandidateFact>> coveringSets = findCoveringSets(cubeql, cfacts, remainingMsrs);
- if (!coveringSets.isEmpty()) {
- for (Set<CandidateFact> set : coveringSets) {
- set.add(cfact);
- cfactset.add(set);
- }
- } else {
- log.info("Couldnt find any set containing remaining measures:{} {} in {}", remainingMsrs,
- cfactsPassed);
- }
- }
- }
- log.info("Covering set {} for measures {} with factsPassed {}", cfactset, msrs, cfactsPassed);
- return cfactset;
- }
private void resolveCandidateDimTablesForJoinsAndDenorms(CubeQueryContext cubeql) throws LensException {
if (cubeql.getAutoJoinCtx() == null) {
@@ -720,7 +652,7 @@ class CandidateTableResolver implements ContextRewriter {
// The candidate table contains atleast one column in the colSet and
// column can the queried in the range specified
- static boolean checkForFactColumnExistsAndValidForRange(CandidateTable table, Collection<String> colSet,
+ private static boolean checkForFactColumnExistsAndValidForRange(CandidateTable table, Collection<String> colSet,
CubeQueryContext cubeql) {
if (colSet == null || colSet.isEmpty()) {
return true;
@@ -733,37 +665,39 @@ class CandidateTableResolver implements ContextRewriter {
return false;
}
- static boolean checkForFactColumnExistsAndValidForRange(CandidateFact table, Collection<QueriedPhraseContext> colSet,
- CubeQueryContext cubeql) throws LensException {
+
+ private static boolean checkForFactColumnExistsAndValidForRange(StorageCandidate sc,
+ Collection<QueriedPhraseContext> colSet,
+ CubeQueryContext cubeql) throws LensException {
if (colSet == null || colSet.isEmpty()) {
return true;
}
for (QueriedPhraseContext qur : colSet) {
- if (qur.isEvaluable(cubeql, table)) {
+ if (qur.isEvaluable(cubeql, sc)) {
return true;
}
}
return false;
}
- static boolean allEvaluable(CandidateFact table, Collection<QueriedPhraseContext> colSet,
- CubeQueryContext cubeql) throws LensException {
+ static boolean allEvaluable(StorageCandidate sc, Collection<QueriedPhraseContext> colSet,
+ CubeQueryContext cubeql) throws LensException {
if (colSet == null || colSet.isEmpty()) {
return true;
}
for (QueriedPhraseContext qur : colSet) {
- if (!qur.isEvaluable(cubeql, table)) {
+ if (!qur.isEvaluable(cubeql, sc)) {
return false;
}
}
return true;
}
- static Set<QueriedPhraseContext> coveredMeasures(CandidateFact table, Collection<QueriedPhraseContext> msrs,
- CubeQueryContext cubeql) throws LensException {
+ static Set<QueriedPhraseContext> coveredMeasures(StorageCandidate sc, Collection<QueriedPhraseContext> msrs,
+ CubeQueryContext cubeql) throws LensException {
Set<QueriedPhraseContext> coveringSet = new HashSet<>();
for (QueriedPhraseContext msr : msrs) {
- if (msr.isEvaluable(cubeql, table)) {
+ if (msr.isEvaluable(cubeql, sc)) {
coveringSet.add(msr);
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java
new file mode 100644
index 0000000..dd3b1dd
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java
@@ -0,0 +1,208 @@
+package org.apache.lens.cube.parse;
+
+import java.util.*;
+
+import org.apache.lens.cube.metadata.CubeMetastoreClient;
+import org.apache.lens.cube.metadata.MetastoreUtil;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+
+/**
+ * Placeholder for Util methods that will be required for {@link Candidate}
+ */
+public class CandidateUtil {
+
+ /**
+ * Is calculated measure expression answerable by the Candidate
+ * @param exprNode
+ * @param candidate
+ * @param context
+ * @return
+ * @throws LensException
+ */
+ public static boolean isMeasureExpressionAnswerable(ASTNode exprNode, Candidate candidate, CubeQueryContext context)
+ throws LensException {
+ return candidate.getColumns().containsAll(HQLParser.getColsInExpr(
+ context.getAliasForTableName(context.getCube()), exprNode));
+ }
+
+ /**
+ * Returns true if the Candidate is valid for all the timeranges based on its start and end times.
+ * @param candidate
+ * @param timeRanges
+ * @return
+ */
+ public static boolean isValidForTimeRanges(Candidate candidate, List<TimeRange> timeRanges) {
+ for (TimeRange timeRange : timeRanges) {
+ if (!(timeRange.getFromDate().after(candidate.getStartTime())
+ && timeRange.getToDate().before(candidate.getEndTime()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isPartiallyValidForTimeRanges(Candidate cand, List<TimeRange> timeRanges) {
+ for (TimeRange timeRange : timeRanges) {
+ if ((cand.getStartTime().before(timeRange.getFromDate()) && cand.getEndTime().after(timeRange.getFromDate()))
+ || (cand.getStartTime().before(timeRange.getToDate()) && cand.getEndTime().after(timeRange.getToDate()))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Gets the time partition columns for a storage candidate
+ * TODO decide is this needs to be supported for all Candidate types.
+ *
+ * @param candidate : Stoarge Candidate
+ * @param metastoreClient : Cube metastore client
+ * @return
+ * @throws LensException
+ */
+ public Set<String> getTimePartitionCols(StorageCandidate candidate, CubeMetastoreClient metastoreClient)
+ throws LensException {
+ Set<String> cubeTimeDimensions = candidate.getCube().getTimedDimensions();
+ Set<String> timePartDimensions = new HashSet<String>();
+ String singleStorageTable = candidate.getStorageName();
+ List<FieldSchema> partitionKeys = null;
+ partitionKeys = metastoreClient.getTable(singleStorageTable).getPartitionKeys();
+ for (FieldSchema fs : partitionKeys) {
+ if (cubeTimeDimensions.contains(CubeQueryContext.getTimeDimOfPartitionColumn(candidate.getCube(),
+ fs.getName()))) {
+ timePartDimensions.add(fs.getName());
+ }
+ }
+ return timePartDimensions;
+ }
+
+ /**
+ * Copy Query AST from sourceAst to targetAst
+ *
+ * @param sourceAst
+ * @param targetAst
+ * @throws LensException
+ */
+ public void copyASTs(QueryAST sourceAst, QueryAST targetAst) throws LensException {
+ targetAst.setSelectAST(MetastoreUtil.copyAST(sourceAst.getSelectAST()));
+ targetAst.setWhereAST(MetastoreUtil.copyAST(sourceAst.getWhereAST()));
+ if (sourceAst.getJoinAST() != null) {
+ targetAst.setJoinAST(MetastoreUtil.copyAST(sourceAst.getJoinAST()));
+ }
+ if (sourceAst.getGroupByAST() != null) {
+ targetAst.setGroupByAST(MetastoreUtil.copyAST(sourceAst.getGroupByAST()));
+ }
+ }
+
+ public static Set<StorageCandidate> getStorageCandidates(final Candidate candidate) {
+ return getStorageCandidates(new HashSet<Candidate>(1) {{
+ add(candidate);
+ }});
+ }
+
+
+ public static Set<QueriedPhraseContext> coveredMeasures(Candidate candSet, Collection<QueriedPhraseContext> msrs,
+ CubeQueryContext cubeql) throws LensException {
+ Set<QueriedPhraseContext> coveringSet = new HashSet<>();
+ for (QueriedPhraseContext msr : msrs) {
+ if (candSet.getChildren() == null) {
+ if (msr.isEvaluable(cubeql, (StorageCandidate) candSet)) {
+ coveringSet.add(msr);
+ }
+ } else {
+ for (Candidate cand : candSet.getChildren()) {
+ if (msr.isEvaluable(cubeql, (StorageCandidate) cand)) {
+ coveringSet.add(msr);
+ }
+ }
+ }
+ }
+ return coveringSet;
+ }
+
+ /**
+ * Returns true is the Candidates cover the entire time range.
+ * @param candidates
+ * @param startTime
+ * @param endTime
+ * @return
+ */
+ public static boolean isTimeRangeCovered(Collection<Candidate> candidates, Date startTime, Date endTime) {
+ RangeSet<Date> set = TreeRangeSet.create();
+ for (Candidate candidate : candidates) {
+ set.add(Range.range(candidate.getStartTime(), BoundType.CLOSED, candidate.getEndTime(), BoundType.OPEN));
+ }
+ return set.encloses(Range.range(startTime, BoundType.CLOSED, endTime, BoundType.OPEN));
+ }
+
+ public static Set<String> getColumns(Collection<QueriedPhraseContext> queriedPhraseContexts) {
+ Set<String> cols = new HashSet<>();
+ for (QueriedPhraseContext qur : queriedPhraseContexts) {
+ cols.addAll(qur.getColumns());
+ }
+ return cols;
+ }
+
+ /**
+ * Filters Candidates that contain the filterCandidate
+ *
+ * @param candidates
+ * @param filterCandidate
+ * @return pruned Candidates
+ */
+ public static Collection<Candidate> filterCandidates(Collection<Candidate> candidates, Candidate filterCandidate) {
+ List<Candidate> prunedCandidates = new ArrayList<>();
+ Iterator<Candidate> itr = candidates.iterator();
+ while (itr.hasNext()) {
+ if (itr.next().contains(filterCandidate)) {
+ prunedCandidates.add(itr.next());
+ itr.remove();
+ }
+ }
+ return prunedCandidates;
+ }
+
+ /**
+ * Gets all the Storage Candidates that participate in the collection of passed candidates
+ * @param candidates
+ * @return
+ */
+ public static Set<StorageCandidate> getStorageCandidates(Collection<Candidate> candidates) {
+ Set<StorageCandidate> storageCandidateSet = new HashSet<>();
+ getStorageCandidates(candidates, storageCandidateSet);
+ return storageCandidateSet;
+ }
+
+ private static void getStorageCandidates(Collection<Candidate> candidates,
+ Set<StorageCandidate> storageCandidateSet) {
+ for (Candidate candidate : candidates) {
+ if (candidate.getChildren() == null) {
+ //Expecting this to be a StorageCandidate as it has no children.
+ storageCandidateSet.add((StorageCandidate)candidate);
+ } else {
+ getStorageCandidates(candidate.getChildren(), storageCandidateSet);
+ }
+ }
+ }
+
+ public static StorageCandidate cloneStorageCandidate(StorageCandidate sc) {
+ return new StorageCandidate(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getAlias(), sc.getCubeql());
+ }
+
+ public static class UnionCandidateComparator<T> implements Comparator<UnionCandidate> {
+
+ @Override
+ public int compare(UnionCandidate o1, UnionCandidate o2) {
+ return Integer.valueOf(o1.getChildren().size() - o2.getChildren().size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java
index e83ae76..58fc5b1 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java
@@ -102,9 +102,19 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
// Mapping of a qualified column name to its table alias
private final Map<String, String> colToTableAlias = new HashMap<>();
+ //TODO union: remove candidateFactSets and use
@Getter
private final Set<Set<CandidateFact>> candidateFactSets = new HashSet<>();
+ /**
+ * This is the set of working Candidates that gets updated during different phases of
+ * query resolution. Each {@link ContextRewriter} may add/remove/update Candiadtes in
+ * this working set and from the final set of Candidates single {@link #pickedCandidate}
+ * is chosen.
+ */
+ @Getter
+ private final Set<Candidate> candidates = new HashSet<>();
+
@Getter
// would be added through join chains and de-normalized resolver
protected Map<Aliased<Dimension>, OptionalDimCtx> optionalDimensionMap = new HashMap<>();
@@ -177,9 +187,12 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
@Getter
@Setter
private DenormalizationResolver.DenormalizationContext deNormCtx;
+ //TODO union : deprecate factPruningMsgs
+ @Getter
+ @Deprecated
+ private PruneCauses<CubeFactTable> factPruningMsgs = new PruneCauses<>();
@Getter
- private PruneCauses<CubeFactTable> factPruningMsgs =
- new PruneCauses<CubeFactTable>();
+ private PruneCauses<StorageCandidate> storagePruningMsgs = new PruneCauses<>();
@Getter
private Map<Dimension, PruneCauses<CubeDimensionTable>> dimPruningMsgs =
new HashMap<Dimension, PruneCauses<CubeDimensionTable>>();
@@ -480,9 +493,36 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
return candidateDims;
}
+ /**
+ * TODO union : deprecate this method and use
+ * {@link #addFactPruningMsg(CubeInterface, CubeFactTable, CandidateTablePruneCause)}
+ * or
+ * {@link #addStoragePruningMsg(StorageCandidate, CandidateTablePruneCause)}
+ * */
+ @Deprecated
public void addFactPruningMsgs(CubeFactTable fact, CandidateTablePruneCause factPruningMsg) {
+ throw new IllegalStateException("This method is deprecate");
+ }
+
+ //TODO union : not required as all the pruning happening at StorageCandidate
+ /*
+ public void addFactPruningMsg(CubeInterface cube, CubeFactTable fact, CandidateTablePruneCause factPruningMsg) {
log.info("Pruning fact {} with cause: {}", fact, factPruningMsg);
- factPruningMsgs.addPruningMsg(fact, factPruningMsg);
+ for (String storageName : fact.getStorages()) {
+ addStoragePruningMsg(new StorageCandidate(cube, fact, storageName), factPruningMsg);
+ }
+ }
+*/
+ public void addCandidatePruningMsg(Candidate cand, CandidateTablePruneCause factPruningMsg) {
+ Set<StorageCandidate> scs = CandidateUtil.getStorageCandidates(cand);
+ for (StorageCandidate sc : scs) {
+ addStoragePruningMsg(sc, factPruningMsg);
+ }
+ }
+
+ public void addStoragePruningMsg(StorageCandidate sc, CandidateTablePruneCause factPruningMsg) {
+ log.info("Pruning Storage {} with cause: {}", sc, factPruningMsg);
+ storagePruningMsgs.addPruningMsg(sc, factPruningMsg);
}
public void addDimPruningMsgs(Dimension dim, CubeDimensionTable dimtable, CandidateTablePruneCause msg) {
@@ -675,6 +715,11 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
return qb.getParseInfo().getJoinExpr();
}
+ @Override
+ public void setJoinAST(ASTNode node) {
+ //NO-OP
+ }
+
public String getOrderByString() {
if (orderByAST != null) {
return HQLParser.getString(orderByAST);
@@ -769,6 +814,7 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
}
}
+ // TODO union : Reevaluate this method.
void setNonexistingParts(Map<String, Set<String>> nonExistingParts) throws LensException {
if (!nonExistingParts.isEmpty()) {
ByteArrayOutputStream out = null;
@@ -873,8 +919,14 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
}
private HQLContextInterface hqlContext;
+
+ //TODO union : Delete this and use pickedCandidate
@Getter
private Collection<CandidateFact> pickedFacts;
+
+ @Getter
+ //TODO union : This will be the final Candidate . private Candidate pickedCandidate
+ private Candidate pickedCandidate;
@Getter
private Collection<CandidateDim> pickedDimTables;
@@ -1211,6 +1263,8 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
*
* @param pruneCause
*/
+ //TODO union : deprecated
+ @Deprecated
public void pruneCandidateFactSet(CandidateTablePruneCode pruneCause) {
// remove candidate fact sets that have missing facts
for (Iterator<Set<CandidateFact>> i = candidateFactSets.iterator(); i.hasNext();) {
@@ -1237,6 +1291,8 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
pruneCandidateFactWithCandidateSet(new CandidateTablePruneCause(pruneCause));
}
+ //TODO union : deprecated
+ @Deprecated
public void pruneCandidateFactWithCandidateSet(CandidateTablePruneCause pruneCause) {
// remove candidate facts that are not part of any covering set
Set<CandidateFact> allCoveringFacts = new HashSet<CandidateFact>();
@@ -1253,6 +1309,7 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST {
}
}
+
public void addQueriedTimeDimensionCols(final String timeDimColName) {
checkArgument(StringUtils.isNotBlank(timeDimColName));
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryRewriter.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryRewriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryRewriter.java
index b612173..3ff6070 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryRewriter.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryRewriter.java
@@ -150,6 +150,10 @@ public class CubeQueryRewriter {
// Resolve candidate fact tables and dimension tables for columns queried
rewriters.add(candidateTblResolver);
// Resolve aggregations and generate base select tree
+ rewriters.add(new CandidateCoveringSetsResolver(conf));
+
+ //TODO union: Add CoveringSetResolver which creates UnionCandidates and JoinCandidates. Some code form candidateTblResolver(phase 2) to be moved to CoveringSetResolver
+ //TODO union: AggregateResolver,GroupbyResolver,FieldValidator before CoveringSetResolver
rewriters.add(new AggregateResolver());
rewriters.add(new GroupbyResolver(conf));
rewriters.add(new FieldValidator());
@@ -159,12 +163,15 @@ public class CubeQueryRewriter {
rewriters.add(new TimeRangeChecker(conf));
// Resolve candidate fact tables and dimension tables for columns included
// in join and denorm resolvers
+ //TODO union : this should be CoveringSetResolver now
rewriters.add(candidateTblResolver);
// Phase 1: resolve fact tables.
+ //TODO union: This phase 1 of storageTableResolver should happen before CoveringSetResolver
rewriters.add(storageTableResolver);
if (lightFactFirst) {
// Prune candidate tables for which denorm column references do not exist
+ //TODO union: phase 2 of denormResolver needs to be moved before CoveringSetResolver
rewriters.add(denormResolver);
// Prune candidate facts without any valid expressions
rewriters.add(exprResolver);
@@ -176,6 +183,7 @@ public class CubeQueryRewriter {
// Phase 3: resolve dimension tables and partitions.
rewriters.add(storageTableResolver);
// Prune candidate tables for which denorm column references do not exist
+ //TODO union: phase 2 of denormResolver needs to be moved before CoveringSetResolver.. check if this makes sense
rewriters.add(denormResolver);
// Prune candidate facts without any valid expressions
rewriters.add(exprResolver);
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java
index 40ed387..d8f1ab4 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java
@@ -345,20 +345,27 @@ public class DenormalizationResolver implements ContextRewriter {
// In the second iteration of denorm resolver
// candidate tables which require denorm fields and the refernces are no
// more valid will be pruned
- if (cubeql.getCube() != null && !cubeql.getCandidateFacts().isEmpty()) {
- for (Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); i.hasNext();) {
- CandidateFact cfact = i.next();
- if (denormCtx.tableToRefCols.containsKey(cfact.getName())) {
- for (ReferencedQueriedColumn refcol : denormCtx.tableToRefCols.get(cfact.getName())) {
- if (denormCtx.getReferencedCols().get(refcol.col.getName()).isEmpty()) {
- log.info("Not considering fact table:{} as column {} is not available", cfact, refcol.col);
- cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.columnNotFound(refcol.col.getName()));
- i.remove();
+ if (cubeql.getCube() != null && !cubeql.getCandidates().isEmpty()) {
+ for (Iterator<Candidate> i = cubeql.getCandidates().iterator(); i.hasNext();) {
+ Candidate cand = i.next();
+ //TODO union : is this happening in pahse 1 or 2 ?
+ //TODO Union : If phase 2, the below code will not work. Move to phase1 in that case
+ if (cand instanceof StorageCandidate) {
+ StorageCandidate sc = (StorageCandidate) cand;
+ if (denormCtx.tableToRefCols.containsKey(sc.getFact().getName())) {
+ for (ReferencedQueriedColumn refcol : denormCtx.tableToRefCols.get(sc.getFact().getName())) {
+ if (denormCtx.getReferencedCols().get(refcol.col.getName()).isEmpty()) {
+ log.info("Not considering storage candidate :{} as column {} is not available", sc, refcol.col);
+ cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.columnNotFound(refcol.col.getName()));
+ i.remove();
+ }
}
}
+ } else {
+ throw new LensException("Not a storage candidate!!");
}
}
- if (cubeql.getCandidateFacts().size() == 0) {
+ if (cubeql.getCandidates().size() == 0) {
throw new LensException(LensCubeErrorCode.NO_FACT_HAS_COLUMN.getLensErrorInfo(),
cubeql.getColumnsQueriedForTable(cubeql.getCube().getName()).toString());
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java
index 60dacdb..1b8c560 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java
@@ -647,42 +647,38 @@ class ExpressionResolver implements ContextRewriter {
// prune invalid expressions
cubeql.getExprCtx().pruneExpressions();
// prune candidate facts without any valid expressions
- if (cubeql.getCube() != null && !cubeql.getCandidateFacts().isEmpty()) {
+ if (cubeql.getCube() != null && !cubeql.getCandidates().isEmpty()) {
for (Map.Entry<String, Set<ExpressionContext>> ecEntry : exprCtx.allExprsQueried.entrySet()) {
String expr = ecEntry.getKey();
Set<ExpressionContext> ecSet = ecEntry.getValue();
for (ExpressionContext ec : ecSet) {
if (ec.getSrcTable().getName().equals(cubeql.getCube().getName())) {
if (cubeql.getQueriedExprsWithMeasures().contains(expr)) {
- for (Iterator<Set<CandidateFact>> sItr = cubeql.getCandidateFactSets().iterator(); sItr.hasNext();) {
- Set<CandidateFact> factSet = sItr.next();
- boolean evaluableInSet = false;
- for (CandidateFact cfact : factSet) {
- if (ec.isEvaluable(cfact)) {
- evaluableInSet = true;
- }
- }
- if (!evaluableInSet) {
- log.info("Not considering fact table set:{} as {} is not evaluable", factSet, ec.exprCol.getName());
+ for (Iterator<Candidate> sItr = cubeql.getCandidates().iterator(); sItr.hasNext(); ) {
+ Candidate cand = sItr.next();
+ if (!cand.isExpressionEvaluable(ec)) {
+ log.info("Not considering Candidate :{} as {} is not evaluable", cand, ec.exprCol.getName());
sItr.remove();
}
}
} else {
- for (Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); i.hasNext();) {
- CandidateFact cfact = i.next();
- if (!ec.isEvaluable(cfact)) {
- log.info("Not considering fact table:{} as {} is not evaluable", cfact, ec.exprCol.getName());
- cubeql.addFactPruningMsgs(cfact.fact,
- CandidateTablePruneCause.expressionNotEvaluable(ec.exprCol.getName()));
- i.remove();
+ // prune dimension only expressions
+ Set<StorageCandidate> storageCandidates = CandidateUtil.getStorageCandidates(cubeql.getCandidates());
+ for (StorageCandidate sc : storageCandidates) {
+ if (!sc.isExpressionEvaluable(ec)) {
+ Collection<Candidate> prunedCandidates =
+ CandidateUtil.filterCandidates(cubeql.getCandidates(), sc);
+ log.info("Not considering candidate(s) :{} as expr :{} in storage :{} is not evaluable",
+ prunedCandidates, ec.exprCol.getName(), sc);
+ cubeql.addStoragePruningMsg(sc,
+ CandidateTablePruneCause.expressionNotEvaluable(ec.exprCol.getName()));
}
}
- }
}
}
}
- cubeql.pruneCandidateFactWithCandidateSet(CandidateTablePruneCode.EXPRESSION_NOT_EVALUABLE);
}
+ }
// prune candidate dims without any valid expressions
if (cubeql.getDimensions() != null && !cubeql.getDimensions().isEmpty()) {
for (Dimension dim : cubeql.getDimensions()) {
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java
new file mode 100644
index 0000000..7781ba6
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java
@@ -0,0 +1,119 @@
+package org.apache.lens.cube.parse;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Set;
+
+import org.apache.lens.cube.metadata.FactPartition;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.Getter;
+
+/**
+ * Represents a join of two candidates
+ */
+public class JoinCandidate implements Candidate {
+
+ /**
+ * Child candidates that will participate in the join
+ */
+ private Candidate childCandidate1;
+ private Candidate childCandidate2;
+ private String toStr;
+ @Getter
+ private String alias;
+
+ public JoinCandidate(Candidate childCandidate1, Candidate childCandidate2, String alias) {
+ this.childCandidate1 = childCandidate1;
+ this.childCandidate2 = childCandidate2;
+ this.alias = alias;
+ }
+
+ private String getJoinCondition() {
+ return null;
+ }
+
+ @Override
+ public String toHQL() {
+ return null;
+ }
+
+ @Override
+ public QueryAST getQueryAst() {
+ return null;
+ }
+
+ @Override
+ public Collection<String> getColumns() {
+ return null;
+ }
+
+ @Override
+ public Date getStartTime() {
+ return childCandidate1.getStartTime().after(childCandidate2.getStartTime())
+ ? childCandidate1.getStartTime()
+ : childCandidate2.getStartTime();
+ }
+
+ @Override
+ public Date getEndTime() {
+ return childCandidate1.getEndTime().before(childCandidate2.getEndTime())
+ ? childCandidate1.getEndTime()
+ : childCandidate2.getEndTime();
+ }
+
+ @Override
+ public double getCost() {
+ return childCandidate1.getCost() + childCandidate2.getCost();
+ }
+
+ @Override
+ public boolean contains(Candidate candidate) {
+ if (this.equals(candidate)) {
+ return true;
+ } else
+ return childCandidate1.contains(candidate) || childCandidate2.contains(candidate);
+ }
+
+ @Override
+ public Collection<Candidate> getChildren() {
+ return new ArrayList() {{
+ add(childCandidate1);
+ add(childCandidate2);
+ }};
+ }
+
+ /**
+ * @param timeRange
+ * @return
+ */
+ @Override
+ public boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData) throws LensException {
+ return this.childCandidate1.evaluateCompleteness(timeRange, failOnPartialData) && this.childCandidate2
+ .evaluateCompleteness(timeRange, failOnPartialData);
+ }
+
+ @Override
+ public Set<FactPartition> getParticipatingPartitions() {
+ return null;
+ }
+
+ @Override
+ public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+ return childCandidate1.isExpressionEvaluable(expr) || childCandidate1.isExpressionEvaluable(expr);
+ }
+
+ @Override
+ public String toString() {
+ if (this.toStr == null) {
+ this.toStr = getToString();
+ }
+ return this.toStr;
+ }
+
+ private String getToString() {
+ return this.toStr = "JOIN[" + childCandidate1.toString() + ", " + childCandidate2.toString() + "]";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinResolver.java
index 7b865bf..0370964 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinResolver.java
@@ -43,7 +43,10 @@ import lombok.extern.slf4j.Slf4j;
class JoinResolver implements ContextRewriter {
private Map<AbstractCubeTable, JoinType> tableJoinTypeMap;
private AbstractCubeTable target;
- private HashMap<Dimension, List<JoinChain>> dimensionInJoinChain = new HashMap<Dimension, List<JoinChain>>();
+ /**
+ * Dimension as key and all the participating join chains for this dimension as value.
+ */
+ private HashMap<Dimension, List<JoinChain>> dimensionToJoinChainsMap = new HashMap<Dimension, List<JoinChain>>();
public JoinResolver(Configuration conf) {
}
@@ -95,10 +98,10 @@ class JoinResolver implements ContextRewriter {
dims.add(chain.getDestTable());
for (String dim : dims) {
Dimension dimension = cubeql.getMetastoreClient().getDimension(dim);
- if (dimensionInJoinChain.get(dimension) == null) {
- dimensionInJoinChain.put(dimension, new ArrayList<JoinChain>());
+ if (dimensionToJoinChainsMap.get(dimension) == null) {
+ dimensionToJoinChainsMap.put(dimension, new ArrayList<JoinChain>());
}
- dimensionInJoinChain.get(dimension).add(chain);
+ dimensionToJoinChainsMap.get(dimension).add(chain);
}
}
}
@@ -143,7 +146,7 @@ class JoinResolver implements ContextRewriter {
Map<Aliased<Dimension>, List<JoinPath>> multipleJoinPaths = new LinkedHashMap<>();
- // populate paths from joinchains
+ // populate paths from joinchains. For a destination Dimension get all the join paths that lead to it.
for (JoinChain chain : cubeql.getJoinchains().values()) {
Dimension dimension = cubeql.getMetastoreClient().getDimension(chain.getDestTable());
Aliased<Dimension> aliasedDimension = Aliased.create(dimension, chain.getName());
@@ -153,6 +156,7 @@ class JoinResolver implements ContextRewriter {
multipleJoinPaths.get(aliasedDimension).addAll(
chain.getRelationEdges(cubeql.getMetastoreClient()));
}
+
boolean flattenBridgeTables = cubeql.getConf().getBoolean(CubeQueryConfUtil.ENABLE_FLATTENING_FOR_BRIDGETABLES,
CubeQueryConfUtil.DEFAULT_ENABLE_FLATTENING_FOR_BRIDGETABLES);
String bridgeTableFieldAggr = cubeql.getConf().get(CubeQueryConfUtil.BRIDGE_TABLE_FIELD_AGGREGATOR,
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/LightestFactResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/LightestFactResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/LightestFactResolver.java
index 97accbb..077c0d2 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/LightestFactResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/LightestFactResolver.java
@@ -38,32 +38,24 @@ public class LightestFactResolver implements ContextRewriter {
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
- if (cubeql.getCube() != null && !cubeql.getCandidateFactSets().isEmpty()) {
- Map<Set<CandidateFact>, Double> factWeightMap = new HashMap<Set<CandidateFact>, Double>();
+ if (cubeql.getCube() != null && !cubeql.getCandidates().isEmpty()) {
+ Map<Candidate, Double> factWeightMap = new HashMap<Candidate, Double>();
- for (Set<CandidateFact> facts : cubeql.getCandidateFactSets()) {
- factWeightMap.put(facts, getWeight(facts));
+ for (Candidate cand : cubeql.getCandidates()) {
+ factWeightMap.put(cand, cand.getCost());
}
double minWeight = Collections.min(factWeightMap.values());
- for (Iterator<Set<CandidateFact>> i = cubeql.getCandidateFactSets().iterator(); i.hasNext();) {
- Set<CandidateFact> facts = i.next();
- if (factWeightMap.get(facts) > minWeight) {
- log.info("Not considering facts:{} from candidate fact tables as it has more fact weight:{} minimum:{}",
- facts, factWeightMap.get(facts), minWeight);
+ for (Iterator<Candidate> i = cubeql.getCandidates().iterator(); i.hasNext();) {
+ Candidate cand = i.next();
+ if (factWeightMap.get(cand) > minWeight) {
+ log.info("Not considering candidate:{} from final candidates as it has more fact weight:{} minimum:{}",
+ cand, factWeightMap.get(cand), minWeight);
+ cubeql.addCandidatePruningMsg(cand, new CandidateTablePruneCause(CandidateTablePruneCode.MORE_WEIGHT));
i.remove();
}
}
- cubeql.pruneCandidateFactWithCandidateSet(CandidateTablePruneCode.MORE_WEIGHT);
}
}
-
- private Double getWeight(Set<CandidateFact> set) {
- Double weight = 0.0;
- for (CandidateFact f : set) {
- weight += f.fact.weight();
- }
- return weight;
- }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java
index 45824fe..57c9c44 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java
@@ -67,6 +67,8 @@ class MaxCoveringFactResolver implements ContextRewriter {
// For each part column, which candidate fact sets are covering how much amount.
// Later, we'll maximize coverage for each queried part column.
Map<String, Map<Set<CandidateFact>, Long>> partCountsPerPartCol = Maps.newHashMap();
+ //TODO union: max covering set will be calculated based on List<Candidate>
+ //TODO union: Each candidate will provide Set<FactPartion> using {@link Candidate.getParticipatingPartitions}
for (Set<CandidateFact> facts : cubeql.getCandidateFactSets()) {
for (Map.Entry<String, Long> entry : getTimeCoveredForEachPartCol(facts).entrySet()) {
if (!partCountsPerPartCol.containsKey(entry.getKey())) {
@@ -114,6 +116,7 @@ class MaxCoveringFactResolver implements ContextRewriter {
}
// We prune those candidate fact set, whose dataCompletenessFactor is less than maxDataCompletenessFactor
+ //TODO union : This needs to work on List<Candidate>
Iterator<Set<CandidateFact>> iter = cubeql.getCandidateFactSets().iterator();
while (iter.hasNext()) {
Set<CandidateFact> facts = iter.next();
@@ -127,6 +130,7 @@ class MaxCoveringFactResolver implements ContextRewriter {
cubeql.pruneCandidateFactWithCandidateSet(CandidateTablePruneCause.incompletePartitions(null));
}
+ //TODO union : This needs to work on Candidate
private float computeDataCompletenessFactor(Set<CandidateFact> facts) {
float completenessFactor = 0f;
int numPartition = 0;
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/PruneCauses.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/PruneCauses.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/PruneCauses.java
index 9b5a52f..c17e5bf 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/PruneCauses.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/PruneCauses.java
@@ -36,7 +36,7 @@ import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
-public class PruneCauses<T extends AbstractCubeTable> extends HashMap<T, List<CandidateTablePruneCause>> {
+public class PruneCauses<T> extends HashMap<T, List<CandidateTablePruneCause>> {
@Getter(lazy = true)
private final HashMap<CandidateTablePruneCause, List<T>> reversed = reverse();
@Getter(lazy = true)
@@ -66,7 +66,7 @@ public class PruneCauses<T extends AbstractCubeTable> extends HashMap<T, List<Ca
get(table).add(msg);
}
- public HashMap<CandidateTablePruneCause, List<T>> reverse() {
+ private HashMap<CandidateTablePruneCause, List<T>> reverse() {
HashMap<CandidateTablePruneCause, List<T>> result = new HashMap<CandidateTablePruneCause, List<T>>();
for (T key : keySet()) {
for (CandidateTablePruneCause value : get(key)) {
@@ -103,7 +103,7 @@ public class PruneCauses<T extends AbstractCubeTable> extends HashMap<T, List<Ca
Map<CandidateTablePruneCause, String> maxCauseMap = Maps.newHashMap();
for (Map.Entry<CandidateTablePruneCause, List<T>> entry: getReversed().entrySet()) {
if (entry.getKey().getCause().equals(maxCause)) {
- maxCauseMap.put(entry.getKey(), StringUtils.join(entry.getValue(), ","));
+ maxCauseMap.put(entry.getKey(), StringUtils.join(entry.getValue(), ","));
}
}
return maxCause.getBriefError(maxCauseMap.keySet());