You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2017/04/12 13:03:57 UTC

[14/20] lens git commit: LENS-1381: Support Fact to Fact Union

http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
index 2b63193..10c3bbe 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -18,35 +18,21 @@
  */
 package org.apache.lens.cube.parse;
 
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.incompletePartitions;
+import static org.apache.lens.cube.parse.CandidateTablePruneCause.partitionColumnsMissing;
 
 import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
-import static org.apache.lens.cube.metadata.MetastoreUtil.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*;
-import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*;
 
 import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.parse.CandidateTablePruneCause.*;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode;
+import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode;
 import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.api.metastore.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import lombok.extern.slf4j.Slf4j;
-
 /**
  * Resolve storages and partitions of all candidate tables and prunes candidate tables with missing storages or
  * partitions.
@@ -57,35 +43,13 @@ class StorageTableResolver implements ContextRewriter {
   private final Configuration conf;
   private final List<String> supportedStorages;
   private final boolean allStoragesSupported;
-  CubeMetastoreClient client;
   private final boolean failOnPartialData;
   private final List<String> validDimTables;
-  private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>();
-  private String processTimePartCol = null;
   private final UpdatePeriod maxInterval;
+  // TODO union : Remove this. All partitions are stored in the StorageCandidate.
   private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>();
-  private TimeRangeWriter rangeWriter;
-  private DateFormat partWhereClauseFormat = null;
+  private CubeMetastoreClient client;
   private PHASE phase;
-  private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact;
-  private float completenessThreshold;
-  private String completenessPartCol;
-
-  enum PHASE {
-    FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
-
-    static PHASE first() {
-      return values()[0];
-    }
-
-    static PHASE last() {
-      return values()[values().length - 1];
-    }
-
-    PHASE next() {
-      return values()[(this.ordinal() + 1) % values().length];
-    }
-  }
 
   StorageTableResolver(Configuration conf) {
     this.conf = conf;
@@ -94,24 +58,13 @@ class StorageTableResolver implements ContextRewriter {
     this.failOnPartialData = conf.getBoolean(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false);
     String str = conf.get(CubeQueryConfUtil.VALID_STORAGE_DIM_TABLES);
     validDimTables = StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
-    this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL);
     String maxIntervalStr = conf.get(CubeQueryConfUtil.QUERY_MAX_INTERVAL);
     if (maxIntervalStr != null) {
-      this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr);
+      this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr.toUpperCase());
     } else {
       this.maxInterval = null;
     }
-    rangeWriter =
-      ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS,
-        CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf);
-    String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT);
-    if (formatStr != null) {
-      partWhereClauseFormat = new SimpleDateFormat(formatStr);
-    }
     this.phase = PHASE.first();
-    completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD,
-            CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD);
-    completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL);
   }
 
   private List<String> getSupportedStorages(Configuration conf) {
@@ -122,55 +75,88 @@ class StorageTableResolver implements ContextRewriter {
     return null;
   }
 
-  public boolean isStorageSupported(String storage) {
+  private boolean isStorageSupportedOnDriver(String storage) {
     return allStoragesSupported || supportedStorages.contains(storage);
   }
 
-  Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>();
-
   @Override
   public void rewriteContext(CubeQueryContext cubeql) throws LensException {
     client = cubeql.getMetastoreClient();
 
     switch (phase) {
-    case FACT_TABLES:
-      if (!cubeql.getCandidateFacts().isEmpty()) {
-        // resolve storage table names
-        resolveFactStorageTableNames(cubeql);
+    case STORAGE_TABLES:
+      if (!cubeql.getCandidates().isEmpty()) {
+        resolveStorageTable(cubeql);
       }
-      cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
       break;
-    case FACT_PARTITIONS:
-      if (!cubeql.getCandidateFacts().isEmpty()) {
-        // resolve storage partitions
-        resolveFactStoragePartitions(cubeql);
-      }
-      cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES);
-      if (client != null && client.isDataCompletenessCheckEnabled()) {
-        if (!cubeql.getCandidateFacts().isEmpty()) {
-          // resolve incomplete fact partition
-          resolveFactCompleteness(cubeql);
-        }
-        cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION);
+    case STORAGE_PARTITIONS:
+      if (!cubeql.getCandidates().isEmpty()) {
+        resolveStoragePartitions(cubeql);
       }
       break;
     case DIM_TABLE_AND_PARTITIONS:
       resolveDimStorageTablesAndPartitions(cubeql);
       if (cubeql.getAutoJoinCtx() != null) {
         // After all candidates are pruned after storage resolver, prune join paths.
-        cubeql.getAutoJoinCtx().pruneAllPaths(cubeql.getCube(), cubeql.getCandidateFacts(), null);
+        cubeql.getAutoJoinCtx()
+          .pruneAllPaths(cubeql.getCube(), CandidateUtil.getStorageCandidates(cubeql.getCandidates()), null);
         cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables());
         cubeql.getAutoJoinCtx().refreshJoinPathColumns();
       }
+      // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate
+      cubeql.setNonexistingParts(nonExistingPartitions);
       break;
     }
-    //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions.
-    cubeql.setNonexistingParts(nonExistingPartitions);
     phase = phase.next();
   }
 
+  /**
+   * Each candidate in the set is a complex candidate. We will evaluate each one to get
+   * all the partitions needed to answer the query.
+   *
+   * @param cubeql cube query context
+   */
+  private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException {
+    Iterator<Candidate> candidateIterator = cubeql.getCandidates().iterator();
+    while (candidateIterator.hasNext()) {
+      Candidate candidate = candidateIterator.next();
+      boolean isComplete = true;
+      boolean isTimeRangeAnswerableByThisCandidate = true;
+      for (TimeRange range : cubeql.getTimeRanges()) {
+        if (!candidate.isTimeRangeCoverable(range)) {
+          isTimeRangeAnswerableByThisCandidate = false;
+          log.info("Not considering candidate:{} as it can not cover time range {}", candidate, range);
+          cubeql.addCandidatePruningMsg(candidate,
+            CandidateTablePruneCause.storageNotAvailableInRange(Lists.newArrayList(range)));
+          break;
+        }
+        isComplete &= candidate.evaluateCompleteness(range, range, failOnPartialData);
+      }
+      if (!isTimeRangeAnswerableByThisCandidate) {
+        candidateIterator.remove();
+      } else if (failOnPartialData && !isComplete) {
+        candidateIterator.remove();
+        log.info("Not considering candidate:{} as its data is not is not complete", candidate);
+        Set<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate);
+        for (StorageCandidate sc : scSet) {
+          if (!sc.getNonExistingPartitions().isEmpty()) {
+            cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.missingPartitions(sc.getNonExistingPartitions()));
+          } else if (!sc.getDataCompletenessMap().isEmpty()) {
+            cubeql.addStoragePruningMsg(sc, incompletePartitions(sc.getDataCompletenessMap()));
+          }
+        }
+      } else if (candidate.getParticipatingPartitions().isEmpty()
+        && candidate instanceof StorageCandidate
+        && ((StorageCandidate) candidate).getNonExistingPartitions().isEmpty()) {
+        candidateIterator.remove();
+        cubeql.addCandidatePruningMsg(candidate,
+          new CandidateTablePruneCause(CandidateTablePruneCode.NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE));
+      }
+    }
+  }
+
   private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException {
-    Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions());
+    Set<Dimension> allDims = new HashSet<>(cubeql.getDimensions());
     for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) {
       allDims.add(dim.getObject());
     }
@@ -184,21 +170,23 @@ class StorageTableResolver implements ContextRewriter {
         CandidateDim candidate = i.next();
         CubeDimensionTable dimtable = candidate.dimtable;
         if (dimtable.getStorages().isEmpty()) {
-          cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(
-            CandidateTablePruneCode.MISSING_STORAGES));
+          cubeql
+            .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
           i.remove();
           continue;
         }
-        Set<String> storageTables = new HashSet<String>();
+        Set<String> storageTables = new HashSet<>();
         Map<String, String> whereClauses = new HashMap<String, String>();
         boolean foundPart = false;
-        Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
+        // TODO union : We have to remove all usages of a deprecated class.
+        Map<String, CandidateTablePruneCode> skipStorageCauses = new HashMap<>();
         for (String storage : dimtable.getStorages()) {
-          if (isStorageSupported(storage)) {
-            String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase();
+          if (isStorageSupportedOnDriver(storage)) {
+            String tableName = MetastoreUtil.getFactOrDimtableStorageTableName(dimtable.getName(),
+                storage).toLowerCase();
             if (validDimTables != null && !validDimTables.contains(tableName)) {
               log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName);
-              skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.INVALID));
+              skipStorageCauses.put(tableName, CandidateTablePruneCode.INVALID);
               continue;
             }
 
@@ -212,13 +200,12 @@ class StorageTableResolver implements ContextRewriter {
               }
               if (!failOnPartialData || foundPart) {
                 storageTables.add(tableName);
-                String whereClause =
-                  StorageUtil.getWherePartClause(dim.getTimedDimension(), null,
-                    StorageConstants.getPartitionsForLatest());
+                String whereClause = StorageUtil
+                  .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest());
                 whereClauses.put(tableName, whereClause);
               } else {
                 log.info("Not considering dim storage table:{} as no dim partitions exist", tableName);
-                skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.NO_PARTITIONS));
+                skipStorageCauses.put(tableName, CandidateTablePruneCode.NO_PARTITIONS);
               }
             } else {
               storageTables.add(tableName);
@@ -226,7 +213,7 @@ class StorageTableResolver implements ContextRewriter {
             }
           } else {
             log.info("Storage:{} is not supported", storage);
-            skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
+            skipStorageCauses.put(storage, CandidateTablePruneCode.UNSUPPORTED_STORAGE);
           }
         }
         if (!foundPart) {
@@ -234,7 +221,8 @@ class StorageTableResolver implements ContextRewriter {
         }
         if (storageTables.isEmpty()) {
           log.info("Not considering dim table:{} as no candidate storage tables eixst", dimtable);
-          cubeql.addDimPruningMsgs(dim, dimtable, noCandidateStorages(skipStorageCauses));
+          cubeql.addDimPruningMsgs(dim, dimtable,
+              CandidateTablePruneCause.noCandidateStoragesForDimtable(skipStorageCauses));
           i.remove();
           continue;
         }
@@ -245,619 +233,151 @@ class StorageTableResolver implements ContextRewriter {
     }
   }
 
-  // Resolves all the storage table names, which are valid for each updatePeriod
-  private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException {
-    Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
-    skipStorageCausesPerFact = new HashMap<>();
-    while (i.hasNext()) {
-      CubeFactTable fact = i.next().fact;
-      if (fact.getUpdatePeriods().isEmpty()) {
-        cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES));
-        i.remove();
+  /**
+   * Following storages are removed:
+   * 1. The storage is not supported by driver.
+   * 2. The storage is not in the valid storage list.
+   * 3. The storage is not in any time range in the query.
+   * 4. The storage having no valid update period.
+   *
+   * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}.
+   *
+   * TODO union : Do fourth point before 3.
+   */
+  private void resolveStorageTable(CubeQueryContext cubeql) throws LensException {
+    Iterator<Candidate> it = cubeql.getCandidates().iterator();
+    while (it.hasNext()) {
+      Candidate c = it.next();
+      assert (c instanceof StorageCandidate);
+      StorageCandidate sc = (StorageCandidate) c;
+      String storageTable = sc.getStorageName();
+      // first check: if the storage is supported on driver
+      if (!isStorageSupportedOnDriver(storageTable)) {
+        log.info("Skipping storage: {} as it is not supported", storageTable);
+        cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE));
+        it.remove();
         continue;
       }
-      Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>();
-      validStorageMap.put(fact, storageTableMap);
-      String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName()));
+      String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName()));
       List<String> validFactStorageTables =
         StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ","));
-      Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>();
-
-      for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) {
-        String storage = entry.getKey();
-        // skip storages that are not supported
-        if (!isStorageSupported(storage)) {
-          log.info("Skipping storage: {} as it is not supported", storage);
-          skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED));
-          continue;
-        }
-        String table = getStorageTableName(fact, storage, validFactStorageTables);
-        // skip the update period if the storage is not valid
-        if (table == null) {
-          skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID));
-          continue;
-        }
-        List<String> validUpdatePeriods =
-          CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage));
-
-        boolean isStorageAdded = false;
-        Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>();
-        for (UpdatePeriod updatePeriod : entry.getValue()) {
-          if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
-            log.info("Skipping update period {} for fact {}", updatePeriod, fact);
-            skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER);
-            continue;
-          }
-          if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
-            log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage);
-            skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
-            continue;
-          }
-          Set<String> storageTables = storageTableMap.get(updatePeriod);
-          if (storageTables == null) {
-            storageTables = new LinkedHashSet<>();
-            storageTableMap.put(updatePeriod, storageTables);
-          }
-          isStorageAdded = true;
-          log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod);
-          storageTables.add(table);
-        }
-        if (!isStorageAdded) {
-          skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses));
-        }
-      }
-      skipStorageCausesPerFact.put(fact, skipStorageCauses);
-      if (storageTableMap.isEmpty()) {
-        log.info("Not considering fact table:{} as it does not have any storage tables", fact);
-        cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses));
-        i.remove();
-      }
-    }
-  }
-
-  private TreeSet<UpdatePeriod> getValidUpdatePeriods(CubeFactTable fact) {
-    TreeSet<UpdatePeriod> set = new TreeSet<UpdatePeriod>();
-    set.addAll(validStorageMap.get(fact).keySet());
-    return set;
-  }
-
-  String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) {
-    String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase();
-    if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) {
-      log.info("Skipping storage table {} as it is not valid", tableName);
-      return null;
-    }
-    return tableName;
-  }
-
-  private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql)
-    throws LensException {
-    Cube baseCube = cubeql.getBaseCube();
-    ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName());
-    if (!cubeql.getCube().getName().equals(baseCube.getName())) {
-      tableNames.add(baseCube.getName());
-    }
-    String fallBackString = null;
-    String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
-    for (String tableName : tableNames) {
-      fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
-        .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
-      if (StringUtils.isNotBlank(fallBackString)) {
-        break;
-      }
-    }
-    if (StringUtils.isBlank(fallBackString)) {
-      return null;
-    }
-    Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
-    if (!matcher.matches()) {
-      return null;
-    }
-    DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
-    DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
-    String relatedTimeDim = matcher.group(1).trim();
-    String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
-    return TimeRange.getBuilder()
-      .fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
-      .toDate(diff1.negativeOffsetFrom(range.getToDate()))
-      .partitionColumn(fallbackPartCol).build();
-  }
-
-  private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException {
-    // Find candidate tables wrt supported storages
-    Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
-    while (i.hasNext()) {
-      CandidateFact cfact = i.next();
-      Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>();
-      List<FactPartition> answeringParts = new ArrayList<>();
-      Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact);
-      if (skipStorageCauses == null) {
-        skipStorageCauses = new HashMap<>();
-      }
-      PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns();
-      boolean noPartsForRange = false;
-      Set<String> unsupportedTimeDims = Sets.newHashSet();
-      Set<String> partColsQueried = Sets.newHashSet();
-      for (TimeRange range : cubeql.getTimeRanges()) {
-        partColsQueried.add(range.getPartitionColumn());
-        StringBuilder extraWhereClause = new StringBuilder();
-        Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts);
-        // If no partitions were found, then we'll fallback.
-        String partCol = range.getPartitionColumn();
-        boolean partColNotSupported = rangeParts.isEmpty();
-        for (String storage : cfact.fact.getStorages()) {
-          String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase();
-          partColNotSupported &= skipStorageCauses.containsKey(storageTableName)
-            && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST)
-            && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol);
-        }
-        TimeRange prevRange = range;
-        String sep = "";
-        while (rangeParts.isEmpty()) {
-          // TODO: should we add a condition whether on range's partcol any missing partitions are not there
-          String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol);
-          if (partColNotSupported && !cfact.getColumns().contains(timeDim)) {
-            unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn()));
-            break;
-          }
-          TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql);
-          log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange);
-          if (fallBackRange == null) {
-            break;
-          }
-          partColsQueried.add(fallBackRange.getPartitionColumn());
-          rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts);
-          extraWhereClause.append(sep)
-            .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim));
-          sep = " AND ";
-          prevRange = fallBackRange;
-          partCol = prevRange.getPartitionColumn();
-          if (!rangeParts.isEmpty()) {
-            break;
-          }
-        }
-        whereClauseForFallback.put(range, extraWhereClause.toString());
-        if (rangeParts.isEmpty()) {
-          log.info("No partitions for fallback range:{}", range);
-          noPartsForRange = true;
-          continue;
-        }
-        // If multiple storage tables are part of the same fact,
-        // capture range->storage->partitions
-        Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>();
-        for (FactPartition factPart : rangeParts) {
-          for (String table : factPart.getStorageTables()) {
-            if (!tablePartMap.containsKey(table)) {
-              tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart)));
-            } else {
-              LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table);
-              storagePart.add(factPart);
-            }
-          }
-        }
-        cfact.getRangeToStoragePartMap().put(range, tablePartMap);
-        cfact.incrementPartsQueried(rangeParts.size());
-        answeringParts.addAll(rangeParts);
-        cfact.getPartsQueried().addAll(rangeParts);
-      }
-      if (!unsupportedTimeDims.isEmpty()) {
-        log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact,
-          unsupportedTimeDims);
-        cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims));
-        i.remove();
-        continue;
-      }
-      Set<String> nonExistingParts = missingParts.toSet(partColsQueried);
-      if (!nonExistingParts.isEmpty()) {
-        addNonExistingParts(cfact.fact.getName(), nonExistingParts);
-      }
-      if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) {
-        log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact,
-          cubeql.getTimeRanges());
-        /*
-         * This fact is getting discarded because of any of following reasons:
-         * 1. Has missing partitions
-         * 2. All Storage tables were skipped for some reasons.
-         * 3. Storage tables do not have the update period for the timerange queried.
-         */
-        if (failOnPartialData && !nonExistingParts.isEmpty()) {
-          cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts));
-        } else if (!skipStorageCauses.isEmpty()) {
-          CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses);
-          cubeql.addFactPruningMsgs(cfact.fact, cause);
-        } else {
-          CandidateTablePruneCause cause =
-            new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE);
-          cubeql.addFactPruningMsgs(cfact.fact, cause);
-        }
-        i.remove();
+      storageTable = sc.getStorageTable();
+      // Check if storagetable is in the list of valid storages.
+      if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) {
+        log.info("Skipping storage table {} as it is not valid", storageTable);
+        cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE));
+        it.remove();
         continue;
       }
-      // Map from storage to covering parts
-      Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>();
-      StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables);
-      if (minimalStorageTables.isEmpty()) {
-        log.info("Not considering fact table:{} as it does not have any storage tables", cfact);
-        cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses));
-        i.remove();
+      List<String> validUpdatePeriods = CubeQueryConfUtil
+        .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName()));
+      boolean isUpdatePeriodForStorageAdded = false;
+      Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>();
+
+      if (cubeql.getTimeRanges().stream().noneMatch(range -> CandidateUtil.isPartiallyValidForTimeRange(sc, range))) {
+        cubeql.addStoragePruningMsg(sc,
+          new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+        it.remove();
         continue;
       }
-      Set<String> storageTables = new LinkedHashSet<>();
-      storageTables.addAll(minimalStorageTables.keySet());
-      cfact.setStorageTables(storageTables);
-      // Update range->storage->partitions with time range where clause
-      for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) {
-        Map<String, String> rangeToWhere = new HashMap<>();
-        for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) {
-          String table = entry.getKey();
-          Set<FactPartition> minimalParts = entry.getValue();
-
-          LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table);
-          LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet();
-
-          if (rangeParts != null) {
-            minimalPartsCopy.addAll(minimalParts);
-            minimalPartsCopy.retainAll(rangeParts);
-          }
-          if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) {
-            rangeToWhere.put(table, "(("
-              + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()),
-                minimalPartsCopy) + ") and  (" + whereClauseForFallback.get(trange) + "))");
-          } else {
-            rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql,
-              cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy));
-          }
-        }
-        cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere);
-      }
-      log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables);
-    }
-  }
 
-  private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
-                                                        Set<String> measureTag,
-                                                        Map<String, String> tagToMeasureOrExprMap) {
-    CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
-    if (column != null && column.getTags() != null) {
-      String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
-      //Checking if dataCompletenessTag is set for queried measure
-      if (dataCompletenessTag != null) {
-        measureTag.add(dataCompletenessTag);
-        String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
-        if (value == null) {
-          tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+      // Populate valid update periods abd check validity at update period level
+      for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) {
+        if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) {
+          // if user supplied max interval, all intervals larger than that are useless.
+          log.info("Skipping update period {} for candidate {} since it's more than max interval supplied({})",
+            updatePeriod, sc.getStorageTable(), maxInterval);
+          skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.UPDATE_PERIOD_BIGGER_THAN_MAX);
+        } else if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) {
+          // if user supplied valid update periods, other update periods are useless
+          log.info("Skipping update period {} for candidate {} for storage {} since it's invalid",
+            updatePeriod, sc.getStorageTable(), storageTable);
+          skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID);
+        } else if (!sc.isUpdatePeriodUseful(updatePeriod)) {
+          // if the storage candidate finds this update useful to keep looking at the time ranges queried
+          skipUpdatePeriodCauses.put(updatePeriod.toString(),
+            SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD);
         } else {
-          value = value.concat(",").concat(alias);
-          tagToMeasureOrExprMap.put(dataCompletenessTag, value);
-        }
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag,
-                                                             Map<String, String> tagToMeasureOrExprMap) {
-    boolean isExprProcessed;
-    String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
-    for (String expr : cubeql.getQueriedExprsWithMeasures()) {
-      isExprProcessed = false;
-      for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
-              .getAllExprs()) {
-        if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
-          for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
-            if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
-              /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
-              So, even if the expression is composed of measures with different dataCompletenessTags, we will be
-              determining the dataCompleteness from one of the measure and this expression is grouped with the
-              other queried measures that have the same dataCompletenessTag. */
-              isExprProcessed = true;
-              break;
-            }
-          }
-        }
-        if (isExprProcessed) {
-          break;
+          isUpdatePeriodForStorageAdded = true;
+          sc.addValidUpdatePeriod(updatePeriod);
         }
       }
-    }
-  }
-
-  private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException {
-    if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) {
-      return;
-    }
-    DataCompletenessChecker completenessChecker = client.getCompletenessChecker();
-    Set<String> measureTag = new HashSet<>();
-    Map<String, String> tagToMeasureOrExprMap = new HashMap<>();
-
-    processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap);
-
-    Set<String> measures = cubeql.getQueriedMsrs();
-    if (measures == null) {
-      measures = new HashSet<>();
-    }
-    for (String measure : measures) {
-      processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap);
-    }
-    //Checking if dataCompletenessTag is set for the fact
-    if (measureTag.isEmpty()) {
-      log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check");
-      return;
-    }
-    Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator();
-    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
-    while (i.hasNext()) {
-      CandidateFact cFact = i.next();
-      // Map from measure to the map from partition to %completeness
-      Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>();
-
-      String factDataCompletenessTag = cFact.fact.getDataCompletenessTag();
-      if (factDataCompletenessTag == null) {
-        log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact);
-        continue;
+      // For DEBUG purpose only to see why some update periods are skipped.
+      if (!skipUpdatePeriodCauses.isEmpty()) {
+        sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses);
       }
-      boolean isFactDataIncomplete = false;
-      for (TimeRange range : cubeql.getTimeRanges()) {
-        if (!range.getPartitionColumn().equals(completenessPartCol)) {
-          log.info("Completeness check not available for partCol:{}", range.getPartitionColumn());
-          continue;
+      // if no update periods were added in previous section, we skip this storage candidate
+      if (!isUpdatePeriodForStorageAdded) {
+        if (skipUpdatePeriodCauses.values().stream().allMatch(
+          SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) {
+          // all update periods bigger than query range, it means time range not answerable.
+          cubeql.addStoragePruningMsg(sc,
+            new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE));
+        } else { // Update periods are rejected for multiple reasons.
+          cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses));
         }
-        Date from = range.getFromDate();
-        Date to = range.getToDate();
-        Map<String, Map<Date, Float>> completenessMap =  completenessChecker.getCompleteness(factDataCompletenessTag,
-                from, to, measureTag);
-        if (completenessMap != null && !completenessMap.isEmpty()) {
-          for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) {
-            String tag = measureCompleteness.getKey();
-            for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) {
-              if (completenessResult.getValue() < completenessThreshold) {
-                log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag,
-                        completenessResult.getValue(), completenessThreshold,
-                        formatter.format(completenessResult.getKey()));
-                String measureorExprFromTag = tagToMeasureOrExprMap.get(tag);
-                Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag);
-                if (incompletePartition == null) {
-                  incompletePartition = new HashMap<>();
-                  incompleteMeasureData.put(measureorExprFromTag, incompletePartition);
+        it.remove();
+      } else {
+        //set the dates again as they can change based on ValidUpdatePeriod
+        sc.setStorageStartAndEndDate();
+        Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size());
+        for (TimeRange range : cubeql.getTimeRanges()) {
+          CandidateTablePruneCause pruningCauseForThisTimeRange = null;
+          if (!CandidateUtil.isPartiallyValidForTimeRange(sc, range)) {
+            //This is the prune cause
+            pruningCauseForThisTimeRange =
+              new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE);
+          } else if (cubeql.shouldReplaceTimeDimWithPart()) {
+            if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) {
+              pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn());
+              TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql);
+              while (fallBackRange != null) {
+                pruningCauseForThisTimeRange = null;
+                if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(),
+                  fallBackRange.getPartitionColumn())) {
+                  pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn());
+                  fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql);
+                } else {
+                  if (!CandidateUtil.isPartiallyValidForTimeRange(sc, fallBackRange)) {
+                    pruningCauseForThisTimeRange =
+                      new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE);
+                  }
+                  break;
                 }
-                incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue());
-                isFactDataIncomplete = true;
               }
             }
           }
+
+          if (pruningCauseForThisTimeRange != null) {
+            allPruningCauses.add(pruningCauseForThisTimeRange);
+          }
         }
-      }
-      if (isFactDataIncomplete) {
-        log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact,
-                incompleteMeasureData, cubeql.getTimeRanges());
-        if (failOnPartialData) {
-          i.remove();
-          cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData));
-        } else {
-          cFact.setDataCompletenessMap(incompleteMeasureData);
+        if (!allPruningCauses.isEmpty()) {
+          // TODO if this storage can answer atleast one time range , why prune it ?
+          it.remove();
+          cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0]));
         }
       }
     }
   }
 
-  void addNonExistingParts(String name, Set<String> nonExistingParts) {
+  private void addNonExistingParts(String name, Set<String> nonExistingParts) {
     nonExistingPartitions.put(name, nonExistingParts);
   }
 
-  private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range,
-    Map<String, SkipStorageCause> skipStorageCauses,
-    PartitionRangesForPartitionColumns missingPartitions) throws LensException {
-    try {
-      return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses,
-        missingPartitions);
-    } catch (Exception e) {
-      throw new LensException(e);
-    }
-  }
-
-  private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods,
-    boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
-    PartitionRangesForPartitionColumns missingPartitions)
-    throws Exception {
-    Set<FactPartition> partitions = new TreeSet<>();
-    if (range != null && range.isCoverableBy(updatePeriods)
-      && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions,
-        updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) {
-      return partitions;
-    } else {
-      return new TreeSet<>();
-    }
-  }
-
-  private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol,
-    Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods,
-    boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses,
-    PartitionRangesForPartitionColumns missingPartitions)
-    throws Exception {
-    log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate);
-    if (fromDate.equals(toDate) || fromDate.after(toDate)) {
-      return true;
-    }
-    UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods);
-    if (interval == null) {
-      log.info("No max interval for range: {} to {}", fromDate, toDate);
-      return false;
-    }
-    log.debug("Max interval for {} is: {}", fact, interval);
-    Set<String> storageTbls = new LinkedHashSet<String>();
-    storageTbls.addAll(validStorageMap.get(fact).get(interval));
-
-    if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) {
-      for (String storageTbl : storageTbls) {
-        FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat);
-        partitions.add(part);
-        part.getStorageTables().add(storageTbl);
-        part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat);
-        partitions.add(part);
-        part.getStorageTables().add(storageTbl);
-        log.info("Added continuous fact partition for storage table {}", storageTbl);
-      }
-      return true;
-    }
-
-    Iterator<String> it = storageTbls.iterator();
-    while (it.hasNext()) {
-      String storageTableName = it.next();
-      if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) {
-        skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE));
-        it.remove();
-      } else if (!client.partColExists(storageTableName, partCol)) {
-        log.info("{} does not exist in {}", partCol, storageTableName);
-        skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol));
-        it.remove();
-      }
-    }
-
-    if (storageTbls.isEmpty()) {
-      return false;
-    }
-    Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval);
-    Date floorToDate = DateUtil.getFloorDate(toDate, interval);
-
-    int lookAheadNumParts =
-      conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS);
-
-    TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator();
-    // add partitions from ceilFrom to floorTo
-    while (iter.hasNext()) {
-      Date dt = iter.next();
-      Date nextDt = iter.peekNext();
-      FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat);
-      log.debug("candidate storage tables for searching partitions: {}", storageTbls);
-      updateFactPartitionStorageTablesFrom(fact, part, storageTbls);
-      log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables());
-      if (part.isFound()) {
-        log.debug("Adding existing partition {}", part);
-        partitions.add(part);
-        log.debug("Looking for look ahead process time partitions for {}", part);
-        if (processTimePartCol == null) {
-          log.debug("processTimePartCol is null");
-        } else if (partCol.equals(processTimePartCol)) {
-          log.debug("part column is process time col");
-        } else if (updatePeriods.first().equals(interval)) {
-          log.debug("Update period is the least update period");
-        } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) {
-          // see if this is the part of the last-n look ahead partitions
-          log.debug("Not a look ahead partition");
-        } else {
-          log.debug("Looking for look ahead process time partitions for {}", part);
-          // check if finer partitions are required
-          // final partitions are required if no partitions from
-          // look-ahead
-          // process time are present
-          TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts,
-            interval, 1).iterator();
-          while (processTimeIter.hasNext()) {
-            Date pdt = processTimeIter.next();
-            Date nextPdt = processTimeIter.peekNext();
-            FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null,
-              partWhereClauseFormat);
-            updateFactPartitionStorageTablesFrom(fact, processTimePartition,
-              part.getStorageTables());
-            if (processTimePartition.isFound()) {
-              log.debug("Finer parts not required for look-ahead partition :{}", part);
-            } else {
-              log.debug("Looked ahead process time partition {} is not found", processTimePartition);
-              TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
-              newset.addAll(updatePeriods);
-              newset.remove(interval);
-              log.debug("newset of update periods:{}", newset);
-              if (!newset.isEmpty()) {
-                // Get partitions for look ahead process time
-                log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt);
-                Set<FactPartition> processTimeParts =
-                  getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(
-                    processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions);
-                log.debug("Look ahead partitions: {}", processTimeParts);
-                TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build();
-                for (FactPartition pPart : processTimeParts) {
-                  log.debug("Looking for finer partitions in pPart: {}", pPart);
-                  for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) {
-                    FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart,
-                      partWhereClauseFormat);
-                    updateFactPartitionStorageTablesFrom(fact, innerPart, pPart);
-                    if (innerPart.isFound()) {
-                      partitions.add(innerPart);
-                    }
-                  }
-                  log.debug("added all sub partitions blindly in pPart: {}", pPart);
-                }
-              }
-            }
-          }
-        }
-      } else {
-        log.info("Partition:{} does not exist in any storage table", part);
-        TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>();
-        newset.addAll(updatePeriods);
-        newset.remove(interval);
-        if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses,
-          missingPartitions)) {
+  enum PHASE {
+    STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS;
 
-          log.debug("Adding non existing partition {}", part);
-          if (addNonExistingParts) {
-            // Add non existing partitions for all cases of whether we populate all non existing or not.
-            missingPartitions.add(part);
-            if (!failOnPartialData) {
-              Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls);
-              if (st.isEmpty()) {
-                log.info("No eligible storage tables");
-                return false;
-              }
-              partitions.add(part);
-              part.getStorageTables().addAll(st);
-            }
-          } else {
-            log.info("No finer granual partitions exist for {}", part);
-            return false;
-          }
-        } else {
-          log.debug("Finer granual partitions added for {}", part);
-        }
-      }
+    static PHASE first() {
+      return values()[0];
     }
-    return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions,
-      updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)
-      && getPartitions(fact, floorToDate, toDate, partCol, partitions,
-        updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions);
-  }
 
-  private Set<String> getStorageTablesWithoutPartCheck(FactPartition part,
-    Set<String> storageTableNames) throws LensException, HiveException {
-    Set<String> validStorageTbls = new HashSet<>();
-    for (String storageTableName : storageTableNames) {
-      // skip all storage tables for which are not eligible for this partition
-      if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())) {
-        validStorageTbls.add(storageTableName);
-      } else {
-        log.info("Skipping {} as it is not valid for part {}", storageTableName, part.getPartSpec());
-      }
+    static PHASE last() {
+      return values()[values().length - 1];
     }
-    return validStorageTbls;
-  }
 
-  private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
-    FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException {
-    for (String storageTableName : storageTableNames) {
-      // skip all storage tables for which are not eligible for this partition
-      if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())
-        && (client.factPartitionExists(fact, part, storageTableName))) {
-        part.getStorageTables().add(storageTableName);
-        part.setFound(true);
-      }
+    PHASE next() {
+      return values()[(this.ordinal() + 1) % values().length];
     }
   }
-
-  private void updateFactPartitionStorageTablesFrom(CubeFactTable fact,
-    FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException {
-    updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables());
-    part.setFound(part.isFound() && pPart.isFound());
-  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
index f9636d1..f5cd540 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -18,13 +18,19 @@
  */
 package org.apache.lens.cube.parse;
 
+import static org.apache.lens.cube.metadata.DateUtil.WSPACE;
+
 import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.lens.cube.metadata.FactPartition;
-import org.apache.lens.cube.metadata.StorageConstants;
+import org.apache.lens.cube.metadata.*;
+import org.apache.lens.server.api.error.LensException;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.collect.Lists;
+
 public final class StorageUtil {
   private StorageUtil() {
 
@@ -69,8 +75,8 @@ public final class StorageUtil {
     String sep = "";
     for (String timePartCol : timedDimensions) {
       if (!timePartCol.equals(partCol)) {
-        sb.append(sep).append(alias).append(".").append(timePartCol)
-          .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
+        sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '")
+          .append(StorageConstants.LATEST_PARTITION_VALUE).append("'");
         sep = " AND ";
       }
     }
@@ -82,15 +88,11 @@ public final class StorageUtil {
     String sep = "((";
     for (String clause : clauses) {
       if (clause != null && !clause.isEmpty()) {
-        sb
-          .append(sep)
-          .append(clause);
+        sb.append(sep).append(clause);
         sep = ") AND (";
       }
     }
-    return sb
-      .append(sep.equals("((") ? "" : "))")
-      .toString();
+    return sb.append(sep.equals("((") ? "" : "))").toString();
   }
 
   /**
@@ -161,4 +163,112 @@ public final class StorageUtil {
       return null;
     }
   }
+
+  /**
+   * Get fallback range
+   *
+   * @param range
+   * @param factName
+   * @param cubeql
+   * @return
+   * @throws LensException
+   */
+  public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql)
+    throws LensException {
+    Cube baseCube = cubeql.getBaseCube();
+    ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName());
+    if (!cubeql.getCube().getName().equals(baseCube.getName())) {
+      tableNames.add(baseCube.getName());
+    }
+    String fallBackString = null;
+    String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn());
+    for (String tableName : tableNames) {
+      fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters()
+        .get(MetastoreConstants.TIMEDIM_RELATION + timedim);
+      if (StringUtils.isNotBlank(fallBackString)) {
+        break;
+      }
+    }
+    if (StringUtils.isBlank(fallBackString)) {
+      return null;
+    }
+    Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, ""));
+    if (!matcher.matches()) {
+      return null;
+    }
+    DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim());
+    DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim());
+    String relatedTimeDim = matcher.group(1).trim();
+    String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim);
+    return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate()))
+      .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build();
+  }
+
+  /**
+   * Checks how much data is completed for a column.
+   * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker}
+   *
+   * @param cubeql
+   * @param cubeCol
+   * @param alias
+   * @param measureTag
+   * @param tagToMeasureOrExprMap
+   * @return
+   */
+  public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias,
+    Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) {
+    CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol);
+    if (column != null && column.getTags() != null) {
+      String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG);
+      //Checking if dataCompletenessTag is set for queried measure
+      if (dataCompletenessTag != null) {
+        measureTag.add(dataCompletenessTag);
+        String value = tagToMeasureOrExprMap.get(dataCompletenessTag);
+        if (value == null) {
+          tagToMeasureOrExprMap.put(dataCompletenessTag, alias);
+        } else {
+          value = value.concat(",").concat(alias);
+          tagToMeasureOrExprMap.put(dataCompletenessTag, value);
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This method extracts all the columns used in expressions (used in query) and evaluates each
+   * column separately for completeness
+   *
+   * @param cubeql
+   * @param measureTag
+   * @param tagToMeasureOrExprMap
+   */
+  public static void processExpressionsForCompleteness(CubeQueryContext cubeql, Set<String> measureTag,
+    Map<String, String> tagToMeasureOrExprMap) {
+    boolean isExprProcessed;
+    String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName());
+    for (String expr : cubeql.getQueriedExprsWithMeasures()) {
+      isExprProcessed = false;
+      for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias)
+        .getAllExprs()) {
+        if (esc.getTblAliasToColumns().get(cubeAlias) != null) {
+          for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) {
+            if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) {
+              /* This is done to associate the expression with one of the dataCompletenessTag for the measures.
+              So, even if the expression is composed of measures with different dataCompletenessTags, we will be
+              determining the dataCompleteness from one of the measure and this expression is grouped with the
+              other queried measures that have the same dataCompletenessTag. */
+              isExprProcessed = true;
+              break;
+            }
+          }
+        }
+        if (isExprProcessed) {
+          break;
+        }
+      }
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
deleted file mode 100644
index f18ae36..0000000
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.cube.parse;
-
-import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
-
-import java.util.*;
-
-import org.apache.lens.cube.error.ColUnAvailableInTimeRange;
-import org.apache.lens.cube.error.ColUnAvailableInTimeRangeException;
-import org.apache.lens.cube.error.LensCubeErrorCode;
-import org.apache.lens.cube.metadata.*;
-import org.apache.lens.cube.metadata.join.JoinPath;
-import org.apache.lens.cube.parse.join.AutoJoinContext;
-import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.error.LensException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class TimeRangeChecker implements ContextRewriter {
-  public TimeRangeChecker(Configuration conf) {
-  }
-  @Override
-  public void rewriteContext(CubeQueryContext cubeql) throws LensException {
-    if (cubeql.getCube() == null) {
-      return;
-    }
-    doColLifeValidation(cubeql);
-    doFactRangeValidation(cubeql);
-  }
-  private void extractTimeRange(CubeQueryContext cubeql) throws LensException {
-    // get time range -
-    // Time range should be direct child of where condition
-    // TOK_WHERE.TOK_FUNCTION.Identifier Or, it should be right hand child of
-    // AND condition TOK_WHERE.KW_AND.TOK_FUNCTION.Identifier
-    if (cubeql.getWhereAST() == null || cubeql.getWhereAST().getChildCount() < 1) {
-      throw new LensException(LensCubeErrorCode.NO_TIMERANGE_FILTER.getLensErrorInfo());
-    }
-    searchTimeRanges(cubeql.getWhereAST(), cubeql, null, 0);
-  }
-
-  private void searchTimeRanges(ASTNode root, CubeQueryContext cubeql, ASTNode parent, int childIndex)
-    throws LensException {
-    if (root == null) {
-      return;
-    } else if (root.getToken().getType() == TOK_FUNCTION) {
-      ASTNode fname = HQLParser.findNodeByPath(root, Identifier);
-      if (fname != null && CubeQueryContext.TIME_RANGE_FUNC.equalsIgnoreCase(fname.getText())) {
-        processTimeRangeFunction(cubeql, root, parent, childIndex);
-      }
-    } else {
-      for (int i = 0; i < root.getChildCount(); i++) {
-        ASTNode child = (ASTNode) root.getChild(i);
-        searchTimeRanges(child, cubeql, root, i);
-      }
-    }
-  }
-
-  private String getColumnName(ASTNode node) {
-    String column = null;
-    if (node.getToken().getType() == DOT) {
-      ASTNode colIdent = (ASTNode) node.getChild(1);
-      column = colIdent.getText().toLowerCase();
-    } else if (node.getToken().getType() == TOK_TABLE_OR_COL) {
-      // Take child ident.totext
-      ASTNode ident = (ASTNode) node.getChild(0);
-      column = ident.getText().toLowerCase();
-    }
-    return column;
-  }
-
-  private void processTimeRangeFunction(CubeQueryContext cubeql, ASTNode timenode, ASTNode parent, int childIndex)
-    throws LensException {
-    TimeRange.TimeRangeBuilder builder = TimeRange.getBuilder();
-    builder.astNode(timenode);
-    builder.parent(parent);
-    builder.childIndex(childIndex);
-
-    String timeDimName = getColumnName((ASTNode) timenode.getChild(1));
-
-    if (!cubeql.getCube().getTimedDimensions().contains(timeDimName)) {
-      throw new LensException(LensCubeErrorCode.NOT_A_TIMED_DIMENSION.getLensErrorInfo(), timeDimName);
-    }
-    // Replace timeDimName with column which is used for partitioning. Assume
-    // the same column
-    // is used as a partition column in all storages of the fact
-    timeDimName = cubeql.getPartitionColumnOfTimeDim(timeDimName);
-    builder.partitionColumn(timeDimName);
-
-    String fromDateRaw = PlanUtils.stripQuotes(timenode.getChild(2).getText());
-    String toDateRaw = null;
-    if (timenode.getChildCount() > 3) {
-      ASTNode toDateNode = (ASTNode) timenode.getChild(3);
-      if (toDateNode != null) {
-        toDateRaw = PlanUtils.stripQuotes(timenode.getChild(3).getText());
-      }
-    }
-    long currentTime = cubeql.getConf().getLong(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, 0);
-    Date now;
-    if (currentTime != 0) {
-      now = new Date(currentTime);
-    } else {
-      now = new Date();
-    }
-    builder.fromDate(DateUtil.resolveDate(fromDateRaw, now));
-    if (StringUtils.isNotBlank(toDateRaw)) {
-      builder.toDate(DateUtil.resolveDate(toDateRaw, now));
-    } else {
-      builder.toDate(now);
-    }
-
-    TimeRange range = builder.build();
-    range.validate();
-    cubeql.getTimeRanges().add(range);
-  }
-
-  private void doColLifeValidation(CubeQueryContext cubeql) throws LensException,
-      ColUnAvailableInTimeRangeException {
-    Set<String> cubeColumns = cubeql.getColumnsQueriedForTable(cubeql.getCube().getName());
-    if (cubeColumns == null || cubeColumns.isEmpty()) {
-      // Query doesn't have any columns from cube
-      return;
-    }
-
-    for (String col : cubeql.getColumnsQueriedForTable(cubeql.getCube().getName())) {
-      CubeColumn column = cubeql.getCube().getColumnByName(col);
-      for (TimeRange range : cubeql.getTimeRanges()) {
-        if (column == null) {
-          if (!cubeql.getCube().getTimedDimensions().contains(col)) {
-            throw new LensException(LensCubeErrorCode.NOT_A_CUBE_COLUMN.getLensErrorInfo(), col);
-          }
-          continue;
-        }
-        if (!column.isColumnAvailableInTimeRange(range)) {
-          throwException(column);
-        }
-      }
-    }
-
-    // Remove join paths that have columns with invalid life span
-    AutoJoinContext joinContext = cubeql.getAutoJoinCtx();
-    if (joinContext == null) {
-      return;
-    }
-    // Get cube columns which are part of join chain
-    Set<String> joinColumns = joinContext.getAllJoinPathColumnsOfTable((AbstractCubeTable) cubeql.getCube());
-    if (joinColumns == null || joinColumns.isEmpty()) {
-      return;
-    }
-
-    // Loop over all cube columns part of join paths
-    for (String col : joinColumns) {
-      CubeColumn column = cubeql.getCube().getColumnByName(col);
-      for (TimeRange range : cubeql.getTimeRanges()) {
-        if (!column.isColumnAvailableInTimeRange(range)) {
-          log.info("Timerange queried is not in column life for {}, Removing join paths containing the column", column);
-          // Remove join paths containing this column
-          Map<Aliased<Dimension>, List<JoinPath>> allPaths = joinContext.getAllPaths();
-
-          for (Aliased<Dimension> dimension : allPaths.keySet()) {
-            List<JoinPath> joinPaths = allPaths.get(dimension);
-            Iterator<JoinPath> joinPathIterator = joinPaths.iterator();
-
-            while (joinPathIterator.hasNext()) {
-              JoinPath path = joinPathIterator.next();
-              if (path.containsColumnOfTable(col, (AbstractCubeTable) cubeql.getCube())) {
-                log.info("Removing join path: {} as columns :{} is not available in the range", path, col);
-                joinPathIterator.remove();
-                if (joinPaths.isEmpty()) {
-                  // This dimension doesn't have any paths left
-                  throw new LensException(LensCubeErrorCode.NO_JOIN_PATH.getLensErrorInfo(),
-                      "No valid join path available for dimension " + dimension + " which would satisfy time range "
-                          + range.getFromDate() + "-" + range.getToDate());
-                }
-              }
-            } // End loop to remove path
-
-          } // End loop for all paths
-        }
-      } // End time range loop
-    } // End column loop
-  }
-
-
-  private void throwException(CubeColumn column) throws ColUnAvailableInTimeRangeException {
-
-    final Long availabilityStartTime = (column.getStartTimeMillisSinceEpoch().isPresent())
-        ? column.getStartTimeMillisSinceEpoch().get() : null;
-
-    final Long availabilityEndTime = column.getEndTimeMillisSinceEpoch().isPresent()
-        ? column.getEndTimeMillisSinceEpoch().get() : null;
-
-    ColUnAvailableInTimeRange col = new ColUnAvailableInTimeRange(column.getName(), availabilityStartTime,
-        availabilityEndTime);
-
-    throw new ColUnAvailableInTimeRangeException(col);
-  }
-
-  private void doFactRangeValidation(CubeQueryContext cubeql) {
-    Iterator<CandidateFact> iter = cubeql.getCandidateFacts().iterator();
-    while (iter.hasNext()) {
-      CandidateFact cfact = iter.next();
-      List<TimeRange> invalidTimeRanges = Lists.newArrayList();
-      for (TimeRange timeRange : cubeql.getTimeRanges()) {
-        if (!cfact.isValidForTimeRange(timeRange)) {
-          invalidTimeRanges.add(timeRange);
-        }
-      }
-      if (!invalidTimeRanges.isEmpty()){
-        cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.factNotAvailableInRange(invalidTimeRanges));
-        log.info("Not considering {} as it's not available for time ranges: {}", cfact, invalidTimeRanges);
-        iter.remove();
-      }
-    }
-    cubeql.pruneCandidateFactSet(CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
new file mode 100644
index 0000000..7f07dbc
--- /dev/null
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.cube.parse;
+
+import java.util.*;
+
+import org.apache.lens.cube.metadata.FactPartition;
+import org.apache.lens.cube.metadata.TimeRange;
+import org.apache.lens.server.api.error.LensException;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Represents a union of two candidates
+ */
+public class UnionCandidate implements Candidate {
+
+  /**
+   * Caching start and end time calculated for this candidate as it may have many child candidates.
+   */
+  Date startTime = null;
+  Date endTime = null;
+  String toStr;
+  CubeQueryContext cubeql;
+  /**
+   * List of child candidates that will be union-ed
+   */
+  private List<Candidate> childCandidates;
+  private QueryAST queryAst;
+  private Map<TimeRange, Map<Candidate, TimeRange>> splitTimeRangeMap = Maps.newHashMap();
+  public UnionCandidate(List<Candidate> childCandidates, CubeQueryContext cubeql) {
+    this.childCandidates = childCandidates;
+    //this.alias = alias;
+    this.cubeql = cubeql;
+  }
+
+  @Override
+  public Set<Integer> getAnswerableMeasurePhraseIndices() {
+    // All children in the UnionCandiate will be having common quriable measure
+    return  getChildren().iterator().next().getAnswerableMeasurePhraseIndices();
+  }
+
+  @Override
+  public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException {
+    Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange);
+    for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) {
+      if (!entry.getKey().isTimeRangeCoverable(entry.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Collection<String> getColumns() {
+    // In UnionCandidate all columns are same, return the columns
+    // of first child
+    return childCandidates.iterator().next().getColumns();
+  }
+
+  @Override
+  public Date getStartTime() {
+    //Note: concurrent calls not handled specifically (This should not be a problem even if we do
+    //get concurrent calls).
+
+    if (startTime == null) {
+      Date minStartTime = childCandidates.get(0).getStartTime();
+      for (Candidate child : childCandidates) {
+        if (child.getStartTime().before(minStartTime)) {
+          minStartTime = child.getStartTime();
+        }
+      }
+      startTime = minStartTime;
+    }
+    return startTime;
+  }
+
+  @Override
+  public Date getEndTime() {
+    if (endTime == null) {
+      Date maxEndTime = childCandidates.get(0).getEndTime();
+      for (Candidate child : childCandidates) {
+        if (child.getEndTime().after(maxEndTime)) {
+          maxEndTime = child.getEndTime();
+        }
+      }
+      endTime = maxEndTime;
+    }
+    return endTime;
+  }
+
+  @Override
+  public double getCost() {
+    double cost = 0.0;
+    for (TimeRange timeRange : cubeql.getTimeRanges()) {
+      for (Map.Entry<Candidate, TimeRange> entry : getTimeRangeSplit(timeRange).entrySet()) {
+        cost += entry.getKey().getCost() * entry.getValue().milliseconds() / timeRange.milliseconds();
+      }
+    }
+    return cost;
+  }
+
+  @Override
+  public boolean contains(Candidate candidate) {
+    if (this.equals(candidate)) {
+      return true;
+    }
+    for (Candidate child : childCandidates) {
+      if (child.contains((candidate))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Collection<Candidate> getChildren() {
+    return childCandidates;
+  }
+
+  /**
+   * @param timeRange
+   * @return
+   */
+  @Override
+  public boolean evaluateCompleteness(TimeRange timeRange, TimeRange parentTimeRange, boolean failOnPartialData)
+    throws LensException {
+    Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange);
+    boolean ret = true;
+    for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) {
+      ret &= entry.getKey().evaluateCompleteness(entry.getValue(), parentTimeRange, failOnPartialData);
+    }
+    return ret;
+  }
+
+  @Override
+  public Set<FactPartition> getParticipatingPartitions() {
+    Set<FactPartition> factPartitionSet = new HashSet<>();
+    for (Candidate c : childCandidates) {
+      factPartitionSet.addAll(c.getParticipatingPartitions());
+    }
+    return factPartitionSet;
+  }
+
+  @Override
+  public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) {
+    for (Candidate cand : childCandidates) {
+      if (!cand.isExpressionEvaluable(expr)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    if (this.toStr == null) {
+      this.toStr = getToString();
+    }
+    return this.toStr;
+  }
+
+  private String getToString() {
+    StringBuilder builder = new StringBuilder(10 * childCandidates.size());
+    builder.append("UNION[");
+    for (Candidate candidate : childCandidates) {
+      builder.append(candidate.toString());
+      builder.append(", ");
+    }
+    builder.delete(builder.length() - 2, builder.length());
+    builder.append("]");
+    return builder.toString();
+  }
+
+  /**
+   * Splits the parent time range for each candidate.
+   * The candidates are sorted based on their costs.
+   *
+   * @param timeRange
+   * @return
+   */
+  private Map<Candidate, TimeRange> splitTimeRangeForChildren(TimeRange timeRange) {
+    childCandidates.sort(Comparator.comparing(Candidate::getCost));
+    Map<Candidate, TimeRange> childrenTimeRangeMap = new HashMap<>();
+    // Sorted list based on the weights.
+    Set<TimeRange> ranges = new HashSet<>();
+    ranges.add(timeRange);
+    for (Candidate c : childCandidates) {
+      TimeRange.TimeRangeBuilder builder = getClonedBuiler(timeRange);
+      TimeRange tr = resolveTimeRangeForChildren(c, ranges, builder);
+      if (tr != null) {
+        // If the time range is not null it means this child candidate is valid for this union candidate.
+        childrenTimeRangeMap.put(c, tr);
+      }
+    }
+    return childrenTimeRangeMap;
+  }
+  private Map<Candidate, TimeRange> getTimeRangeSplit(TimeRange range) {
+    return splitTimeRangeMap.computeIfAbsent(range, this::splitTimeRangeForChildren);
+  }
+
+  /**
+   * Resolves the time range for this candidate based on overlap.
+   *
+   * @param candidate : Candidate for which the time range is to be calculated
+   * @param ranges    : Set of time ranges from which one has to be choosen.
+   * @param builder   : TimeRange builder created by the common AST.
+   * @return Calculated timeRange for the candidate. If it returns null then there is no suitable time range split for
+   * this candidate. This is the correct behaviour because an union candidate can have non participating child
+   * candidates for the parent time range.
+   */
+  private TimeRange resolveTimeRangeForChildren(Candidate candidate, Set<TimeRange> ranges,
+    TimeRange.TimeRangeBuilder builder) {
+    Iterator<TimeRange> it = ranges.iterator();
+    Set<TimeRange> newTimeRanges = new HashSet<>();
+    TimeRange ret = null;
+    while (it.hasNext()) {
+      TimeRange range = it.next();
+      // Check for out of range
+      if (candidate.getStartTime().getTime() >= range.getToDate().getTime() || candidate.getEndTime().getTime() <= range
+        .getFromDate().getTime()) {
+        continue;
+      }
+      // This means overlap.
+      if (candidate.getStartTime().getTime() <= range.getFromDate().getTime()) {
+        // Start time of the new time range will be range.getFromDate()
+        builder.fromDate(range.getFromDate());
+        if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) {
+          // End time is in the middle of the range is equal to c.getEndTime().
+          builder.toDate(candidate.getEndTime());
+        } else {
+          // End time will be range.getToDate()
+          builder.toDate(range.getToDate());
+        }
+      } else {
+        builder.fromDate(candidate.getStartTime());
+        if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) {
+          builder.toDate(candidate.getEndTime());
+        } else {
+          builder.toDate(range.getToDate());
+        }
+      }
+      // Remove the time range and add more time ranges.
+      it.remove();
+      ret = builder.build();
+      if (ret.getFromDate().getTime() == range.getFromDate().getTime()) {
+        checkAndUpdateNewTimeRanges(ret, range, newTimeRanges);
+      } else {
+        TimeRange.TimeRangeBuilder b1 = getClonedBuiler(ret);
+        b1.fromDate(range.getFromDate());
+        b1.toDate(ret.getFromDate());
+        newTimeRanges.add(b1.build());
+        checkAndUpdateNewTimeRanges(ret, range, newTimeRanges);
+
+      }
+      break;
+    }
+    ranges.addAll(newTimeRanges);
+    return ret;
+  }
+
+  private void checkAndUpdateNewTimeRanges(TimeRange ret, TimeRange range, Set<TimeRange> newTimeRanges) {
+    if (ret.getToDate().getTime() < range.getToDate().getTime()) {
+      TimeRange.TimeRangeBuilder b2 = getClonedBuiler(ret);
+      b2.fromDate(ret.getToDate());
+      b2.toDate(range.getToDate());
+      newTimeRanges.add(b2.build());
+    }
+  }
+
+  private TimeRange.TimeRangeBuilder getClonedBuiler(TimeRange timeRange) {
+    TimeRange.TimeRangeBuilder builder = new TimeRange.TimeRangeBuilder();
+    builder.astNode(timeRange.getAstNode());
+    builder.childIndex(timeRange.getChildIndex());
+    builder.parent(timeRange.getParent());
+    builder.partitionColumn(timeRange.getPartitionColumn());
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
deleted file mode 100644
index e6ee989..0000000
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.lens.cube.parse;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.lens.server.api.error.LensException;
-
-import org.apache.commons.lang.NotImplementedException;
-
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
-
-@AllArgsConstructor
-@RequiredArgsConstructor
-public abstract class UnionHQLContext extends SimpleHQLContext {
-  protected final CubeQueryContext query;
-  protected final CandidateFact fact;
-
-  List<HQLContextInterface> hqlContexts = new ArrayList<>();
-
-  public void setHqlContexts(List<HQLContextInterface> hqlContexts) throws LensException {
-    this.hqlContexts = hqlContexts;
-    StringBuilder queryParts = new StringBuilder("(");
-    String sep = "";
-    for (HQLContextInterface ctx : hqlContexts) {
-      queryParts.append(sep).append(ctx.toHQL());
-      sep = " UNION ALL ";
-    }
-    setFrom(queryParts.append(") ").append(query.getCube().getName()).toString());
-  }
-
-  @Override
-  public String getWhere() {
-    throw new NotImplementedException("Not Implemented");
-  }
-}