You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2017/02/28 12:48:26 UTC
[8/8] lens git commit: LENS-1389: Back Merge with master and fix
lens-cube tests
LENS-1389: Back Merge with master and fix lens-cube tests
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/2aaf6e0a
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/2aaf6e0a
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/2aaf6e0a
Branch: refs/heads/lens-1381
Commit: 2aaf6e0a0345f3328bac0bf8ccfaeff99ca05c0f
Parents: 4d49359
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Tue Feb 28 18:17:15 2017 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Tue Feb 28 18:17:15 2017 +0530
----------------------------------------------------------------------
lens-api/src/main/resources/cube-0.1.xsd | 30 +-
.../NoCandidateFactAvailableException.java | 1 +
.../lens/cube/metadata/CubeFactTable.java | 68 ++-
.../lens/cube/metadata/CubeMetastoreClient.java | 339 ++++++-----
.../org/apache/lens/cube/metadata/DateUtil.java | 28 +-
.../lens/cube/metadata/FactPartition.java | 5 +-
.../lens/cube/metadata/MetastoreUtil.java | 6 +
.../org/apache/lens/cube/metadata/Storage.java | 30 +-
.../apache/lens/cube/metadata/TimeRange.java | 18 +-
.../cube/parse/AbridgedTimeRangeWriter.java | 45 +-
.../lens/cube/parse/BetweenTimeRangeWriter.java | 2 +-
.../parse/CandidateCoveringSetsResolver.java | 35 +-
.../apache/lens/cube/parse/CandidateTable.java | 1 -
.../cube/parse/CandidateTablePruneCause.java | 151 ++---
.../lens/cube/parse/CandidateTableResolver.java | 36 +-
.../apache/lens/cube/parse/CandidateUtil.java | 85 +--
.../apache/lens/cube/parse/CheckTableNames.java | 1 -
.../lens/cube/parse/ColumnLifetimeChecker.java | 131 ++++
.../lens/cube/parse/CubeQueryContext.java | 337 +++++------
.../lens/cube/parse/CubeQueryRewriter.java | 12 +-
.../lens/cube/parse/CubeSemanticAnalyzer.java | 14 +-
.../cube/parse/DenormalizationResolver.java | 227 ++++---
.../lens/cube/parse/ExpressionResolver.java | 287 ++++-----
.../apache/lens/cube/parse/FieldValidator.java | 1 -
.../cube/parse/MaxCoveringFactResolver.java | 7 +-
.../org/apache/lens/cube/parse/PruneCauses.java | 29 +-
.../lens/cube/parse/QueriedPhraseContext.java | 2 +-
.../lens/cube/parse/StorageCandidate.java | 148 +++--
.../lens/cube/parse/StorageTableResolver.java | 147 +++--
.../org/apache/lens/cube/parse/StorageUtil.java | 1 -
.../lens/cube/parse/TimeRangeChecker.java | 238 --------
.../lens/cube/parse/TrackDenormContext.java | 37 ++
.../lens/cube/parse/UnionQueryWriter.java | 2 -
.../FactPartitionBasedQueryCostCalculator.java | 3 +
.../cube/metadata/TestCubeMetastoreClient.java | 151 ++++-
.../apache/lens/cube/parse/CubeTestSetup.java | 14 +-
.../FieldsCannotBeQueriedTogetherTest.java | 2 +-
.../lens/cube/parse/TestBaseCubeQueries.java | 3 +-
.../cube/parse/TestBetweenTimeRangeWriter.java | 51 +-
.../lens/cube/parse/TestCubeRewriter.java | 51 +-
.../cube/parse/TestDenormalizationResolver.java | 73 +--
.../lens/cube/parse/TestExpressionResolver.java | 16 +
.../lens/cube/parse/TestJoinResolver.java | 2 +-
.../lens/cube/parse/TestQueryMetrics.java | 50 +-
.../lens/cube/parse/TestTimeRangeResolver.java | 21 +-
.../parse/TestTimeRangeWriterWithQuery.java | 1 -
.../lens/cube/parse/TestUnionQueries.java | 15 +-
...stFactPartitionBasedQueryCostCalculator.java | 21 +-
.../lens/driver/jdbc/ColumnarSQLRewriter.java | 2 +-
.../lens/driver/jdbc/DruidSQLRewriter.java | 2 +-
.../src/test/resources/yaml/city_table.yaml | 3 +-
.../src/test/resources/yaml/customer_table.yaml | 3 +-
.../src/test/resources/yaml/dim_table.yaml | 3 +-
.../src/test/resources/yaml/dim_table2.yaml | 3 +-
.../src/test/resources/yaml/dim_table4.yaml | 3 +-
.../src/test/resources/yaml/fact1.yaml | 3 +-
.../src/test/resources/yaml/fact2.yaml | 3 +-
.../src/test/resources/yaml/product_table.yaml | 3 +-
.../src/test/resources/yaml/rawfact.yaml | 3 +-
.../yaml/sales-aggr-continuous-fact.yaml | 3 +-
.../test/resources/yaml/sales-aggr-fact1.yaml | 6 +-
.../test/resources/yaml/sales-aggr-fact2.yaml | 6 +-
.../src/test/resources/yaml/sales-raw-fact.yaml | 3 +-
.../regression/core/constants/DriverConfig.java | 2 -
.../core/helpers/ScheduleResourceHelper.java | 62 +-
.../apache/lens/regression/util/AssertUtil.java | 1 -
.../src/main/resources/template.lens.properties | 9 +-
.../apache/lens/regression/ITSessionTests.java | 163 +++++
.../apache/lens/regression/SessionTests.java | 163 -----
.../client/ITDuplicateQueryTests.java | 188 ++++++
.../regression/client/ITKillQueryTests.java | 361 +++++++++++
.../lens/regression/client/ITListQueryTest.java | 7 +-
.../regression/client/ITPreparedQueryTests.java | 13 +-
.../lens/regression/client/ITQueryApiTests.java | 182 ++----
.../regression/client/ITScheduleQueryTests.java | 284 ---------
.../client/ITSessionResourceTests.java | 401 ++++++++++++
.../lens/regression/client/KillQueryTests.java | 362 -----------
.../regression/client/SessionResourceTests.java | 403 ------------
.../regression/config/ITServerConfigTests.java | 197 +++---
.../regression/config/ITSessionConfigTests.java | 4 +-
.../scheduler/ITMaxScheduledQueryTests.java | 160 +++++
.../scheduler/ITScheduleQueryTests.java | 337 +++++++++++
.../lens/regression/throttling/ITCostTests.java | 176 +-----
.../throttling/ITQueueNumberTests.java | 232 +++++++
.../throttling/ITThrottlingTests.java | 605 +++++++++++++++++++
.../lens/regression/throttling/Throttling.java | 604 ------------------
.../metastore/CubeMetastoreServiceImpl.java | 182 ++++--
.../apache/lens/server/metastore/JAXBUtils.java | 68 ++-
.../apache/lens/server/query/LensServerDAO.java | 6 +-
.../server/query/QueryExecutionServiceImpl.java | 3 +-
.../lens/server/session/HiveSessionService.java | 58 +-
.../lens/server/session/LensSessionImpl.java | 20 +-
.../src/main/resources/lensserver-default.xml | 11 +
.../server/metastore/TestMetastoreService.java | 186 +++++-
.../TestQueryIndependenceFromSessionClose.java | 71 ++-
pom.xml | 4 +-
src/site/apt/admin/config.apt | 128 ++--
tools/conf/server/logback.xml | 4 +-
98 files changed, 4826 insertions(+), 3855 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-api/src/main/resources/cube-0.1.xsd
----------------------------------------------------------------------
diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd
index f438f48..060eb43 100644
--- a/lens-api/src/main/resources/cube-0.1.xsd
+++ b/lens-api/src/main/resources/cube-0.1.xsd
@@ -681,8 +681,27 @@
</xs:complexType>
<xs:complexType name="x_update_periods">
- <xs:sequence>
+ <xs:annotation>
+ <xs:documentation>
+ A list of update_period which contains either update period table descriptor or list of update_peroid enum.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:choice maxOccurs="1" minOccurs="0">
+ <xs:element name="update_period_table_descriptor" type="x_update_period_table_descriptor" maxOccurs="unbounded"
+ minOccurs="0"/>
<xs:element name="update_period" type="x_update_period" maxOccurs="unbounded" minOccurs="0"/>
+ </xs:choice>
+ </xs:complexType>
+
+ <xs:complexType name="x_update_period_table_descriptor">
+ <xs:annotation>
+ <xs:documentation>
+ An update period descriptor keeps an enum of update period and a storage table descriptor.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:sequence>
+ <xs:element name="update_period" type="x_update_period" maxOccurs="1" minOccurs="1"/>
+ <xs:element name="table_desc" type="x_storage_table_desc" maxOccurs="1" minOccurs="1"/>
</xs:sequence>
</xs:complexType>
@@ -1001,14 +1020,15 @@
<xs:complexType name="x_storage_table_element">
<xs:annotation>
<xs:documentation>
- Storage and storage table description and update periods
+ Storage and storage table description and update periods. table_desc is invalid when update_periods has a list
+ of update_period_table_descriptor instead of a list of enums.
</xs:documentation>
</xs:annotation>
- <xs:sequence>
+ <xs:all>
<xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="0"/>
<xs:element name="storage_name" type="xs:string"/>
- <xs:element type="x_storage_table_desc" name="table_desc"/>
- </xs:sequence>
+ <xs:element type="x_storage_table_desc" name="table_desc" maxOccurs="1" minOccurs="0"/>
+ </xs:all>
</xs:complexType>
<xs:complexType name="x_storage_tables">
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java b/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java
index bdfa3a0..6f08d0f 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java
@@ -34,6 +34,7 @@ public class NoCandidateFactAvailableException extends LensException {
@Getter
private final CubeQueryContext cubeQueryContext;
+ @Getter
private final PruneCauses<StorageCandidate> briefAndDetailedError;
public NoCandidateFactAvailableException(CubeQueryContext cubeql) {
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
index adb6c92..896a7a1 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
@@ -29,10 +29,14 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Table;
import com.google.common.collect.Lists;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CubeFactTable extends AbstractCubeTable {
+ @Getter
+ // Map<StorageName, Map<update_period, storage_table_prefix>>
+ private final Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap;
private String cubeName;
private final Map<String, Set<UpdatePeriod>> storageUpdatePeriods;
@@ -40,8 +44,10 @@ public class CubeFactTable extends AbstractCubeTable {
super(hiveTable);
this.storageUpdatePeriods = getUpdatePeriods(getName(), getProperties());
this.cubeName = getCubeName(getName(), getProperties());
+ this.storagePrefixUpdatePeriodMap = getUpdatePeriodMap(getName(), getProperties());
}
+
public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
Map<String, Set<UpdatePeriod>> storageUpdatePeriods) {
this(cubeName, factName, columns, storageUpdatePeriods, 0L, new HashMap<String, String>());
@@ -54,9 +60,18 @@ public class CubeFactTable extends AbstractCubeTable {
public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties) {
+ this(cubeName, factName, columns, storageUpdatePeriods, weight, properties,
+ new HashMap<String, Map<UpdatePeriod, String>>());
+
+ }
+
+ public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
+ Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties,
+ Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap) {
super(factName, columns, properties, weight);
this.cubeName = cubeName;
this.storageUpdatePeriods = storageUpdatePeriods;
+ this.storagePrefixUpdatePeriodMap = storagePrefixUpdatePeriodMap;
addProperties();
}
@@ -65,6 +80,18 @@ public class CubeFactTable extends AbstractCubeTable {
super.addProperties();
addCubeNames(getName(), getProperties(), cubeName);
addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods);
+ addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap);
+ }
+
+ private void addStorageTableProperties(String name, Map<String, String> properties,
+ Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) {
+ for (String storageName : storageUpdatePeriodMap.keySet()) {
+ String prefix = MetastoreUtil.getFactKeyPrefix(name) + "." + storageName;
+ for (Map.Entry updatePeriodEntry : storageUpdatePeriodMap.get(storageName).entrySet()) {
+ String updatePeriod = ((UpdatePeriod) updatePeriodEntry.getKey()).getName();
+ properties.put(prefix + "." + updatePeriod, (String) updatePeriodEntry.getValue());
+ }
+ }
}
private static void addUpdatePeriodProperies(String name, Map<String, String> props,
@@ -82,7 +109,29 @@ public class CubeFactTable extends AbstractCubeTable {
props.put(MetastoreUtil.getFactCubeNameKey(factName), cubeName);
}
- private static Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) {
+ private Map<String, Map<UpdatePeriod, String>> getUpdatePeriodMap(String factName, Map<String, String> props) {
+ Map<String, Map<UpdatePeriod, String>> ret = new HashMap<>();
+ for (Map.Entry entry : storageUpdatePeriods.entrySet()) {
+ String storage = (String) entry.getKey();
+ for (UpdatePeriod period : (Set<UpdatePeriod>) entry.getValue()) {
+ String storagePrefixKey = MetastoreUtil
+ .getUpdatePeriodStoragePrefixKey(factName.trim(), storage, period.getName());
+ String storageTableNamePrefix = props.get(storagePrefixKey);
+ if (storageTableNamePrefix == null) {
+ storageTableNamePrefix = storage;
+ }
+ Map<UpdatePeriod, String> mapOfUpdatePeriods = ret.get(storage);
+ if (mapOfUpdatePeriods == null) {
+ mapOfUpdatePeriods = new HashMap<>();
+ ret.put(storage, mapOfUpdatePeriods);
+ }
+ mapOfUpdatePeriods.put(period, storageTableNamePrefix);
+ }
+ }
+ return ret;
+ }
+
+ private Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) {
Map<String, Set<UpdatePeriod>> storageUpdatePeriods = new HashMap<>();
String storagesStr = props.get(MetastoreUtil.getFactStorageListKey(name));
if (!StringUtils.isBlank(storagesStr)) {
@@ -273,13 +322,16 @@ public class CubeFactTable extends AbstractCubeTable {
/**
* Add a storage with specified update periods
- *
* @param storage
* @param updatePeriods
+ * @param updatePeriodStoragePrefix
*/
- void addStorage(String storage, Set<UpdatePeriod> updatePeriods) {
+ void addStorage(String storage, Set<UpdatePeriod> updatePeriods,
+ Map<UpdatePeriod, String> updatePeriodStoragePrefix) {
storageUpdatePeriods.put(storage, updatePeriods);
+ storagePrefixUpdatePeriodMap.put(storage, updatePeriodStoragePrefix);
addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods);
+ addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap);
}
/**
@@ -289,6 +341,12 @@ public class CubeFactTable extends AbstractCubeTable {
*/
void dropStorage(String storage) {
storageUpdatePeriods.remove(storage);
+ String prefix = MetastoreUtil.getFactKeyPrefix(getName()) + "." + storage;
+ for (Map.Entry updatePeriodEntry : storagePrefixUpdatePeriodMap.get(storage).entrySet()) {
+ String updatePeriod = ((UpdatePeriod)updatePeriodEntry.getKey()).getName();
+ getProperties().remove(prefix + "." + updatePeriod);
+ }
+ storagePrefixUpdatePeriodMap.remove(storage);
getProperties().remove(MetastoreUtil.getFactUpdatePeriodKey(getName(), storage));
String newStorages = StringUtils.join(storageUpdatePeriods.keySet(), ",");
getProperties().put(MetastoreUtil.getFactStorageListKey(getName()), newStorages);
@@ -351,5 +409,7 @@ public class CubeFactTable extends AbstractCubeTable {
return Collections.min(Lists.newArrayList(getRelativeEndTime(), getAbsoluteEndTime()));
}
-
+ public String getTablePrefix(String storage, UpdatePeriod updatePeriod) {
+ return storagePrefixUpdatePeriodMap.get(storage).get(updatePeriod);
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
index aa2e9d1..78fb6d3 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
@@ -31,7 +31,7 @@ import org.apache.lens.cube.metadata.Storage.LatestInfo;
import org.apache.lens.cube.metadata.Storage.LatestPartColumnInfo;
import org.apache.lens.cube.metadata.timeline.PartitionTimeline;
import org.apache.lens.cube.metadata.timeline.PartitionTimelineFactory;
-import org.apache.lens.server.api.*;
+import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.metastore.DataCompletenessChecker;
import org.apache.lens.server.api.util.LensUtil;
@@ -121,7 +121,13 @@ public class CubeMetastoreClient {
if (ind <= 0) {
throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName());
}
- return storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length());
+ String name = storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length());
+ for (String storageName : fact.getStorages()) {
+ if (name.equalsIgnoreCase(storageName)) {
+ return storageName;
+ }
+ }
+ throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName());
}
/**
@@ -169,11 +175,11 @@ public class CubeMetastoreClient {
UpdatePeriod updatePeriod = updatePeriodStr == null ? null : UpdatePeriod.valueOf(updatePeriodStr.toUpperCase());
List<PartitionTimeline> ret = Lists.newArrayList();
CubeFactTable fact = getCubeFact(factName);
- List<String> keys = Lists.newArrayList();
+ List<String> storageList = Lists.newArrayList();
if (storage != null) {
- keys.add(storage);
+ storageList.add(storage);
} else {
- keys.addAll(fact.getStorages());
+ storageList.addAll(fact.getStorages());
}
String partCol = null;
if (timeDimension != null) {
@@ -186,9 +192,9 @@ public class CubeMetastoreClient {
}
partCol = baseCube.getPartitionColumnOfTimeDim(timeDimension);
}
- for (String key : keys) {
+ for (String storageName : storageList) {
for (Map.Entry<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> entry : partitionTimelineCache
- .get(factName, key).entrySet()) {
+ .get(factName, storageName).entrySet()) {
if (updatePeriod == null || entry.getKey().equals(updatePeriod)) {
for (Map.Entry<String, PartitionTimeline> entry1 : entry.getValue().entrySet()) {
if (partCol == null || partCol.equals(entry1.getKey())) {
@@ -201,25 +207,30 @@ public class CubeMetastoreClient {
return ret;
}
- public void updatePartition(String fact, String storageName, Partition partition)
+ public void updatePartition(String fact, String storageName, Partition partition, UpdatePeriod updatePeriod)
throws HiveException, InvalidOperationException, LensException {
- updatePartitions(fact, storageName, Collections.singletonList(partition));
+ Map<UpdatePeriod, List<Partition>> updatePeriodListMap = new HashMap<>();
+ updatePeriodListMap.put(updatePeriod, Collections.singletonList(partition));
+ updatePartitions(fact, storageName, updatePeriodListMap);
}
- public void updatePartitions(String factOrDimtableName, String storageName, List<Partition> partitions)
- throws HiveException, InvalidOperationException, LensException {
- List<Partition> partitionsToAlter = Lists.newArrayList();
- partitionsToAlter.addAll(partitions);
- partitionsToAlter.addAll(getAllLatestPartsEquivalentTo(factOrDimtableName, storageName, partitions));
- getStorage(storageName).updatePartitions(getClient(), factOrDimtableName, partitionsToAlter);
+ public void updatePartitions(String factOrDimtableName, String storageName,
+ Map<UpdatePeriod, List<Partition>> partitions) throws HiveException, InvalidOperationException, LensException {
+ for (Map.Entry entry : partitions.entrySet()) {
+ List<Partition> partitionsToAlter = Lists.newArrayList();
+ partitionsToAlter.addAll((List<Partition>) entry.getValue());
+ String storageTableName = getStorageTableName(factOrDimtableName, storageName, (UpdatePeriod) entry.getKey());
+ partitionsToAlter.addAll(
+ getAllLatestPartsEquivalentTo(factOrDimtableName, storageTableName, (List<Partition>) entry.getValue()));
+ getStorage(storageName).updatePartitions(storageTableName, getClient(), factOrDimtableName, partitionsToAlter);
+ }
}
- private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageName,
+ private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageTableName,
List<Partition> partitions) throws HiveException, LensException {
if (isFactTable(factOrDimtableName)) {
return Lists.newArrayList();
}
- String storageTableName = getFactOrDimtableStorageTableName(factOrDimtableName, storageName);
Table storageTable = getTable(storageTableName);
List<String> timePartCols = getTimePartColNamesOfTable(storageTable);
List<Partition> latestParts = Lists.newArrayList();
@@ -279,6 +290,17 @@ public class CubeMetastoreClient {
}
}
+ public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
+ Map<String, Set<UpdatePeriod>> storageAggregatePeriods, double weight, Map<String, String> properties,
+ Map<String, StorageTableDesc> storageTableDescs, Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap)
+ throws LensException {
+ CubeFactTable factTable = new CubeFactTable(cubeName, factName, columns, storageAggregatePeriods, weight,
+ properties, storageUpdatePeriodMap);
+ createCubeTable(factTable, storageTableDescs);
+ // do a get to update cache
+ getCubeFact(factName);
+
+ }
/**
* In-memory storage of {@link PartitionTimeline} objects for each valid
@@ -327,48 +349,75 @@ public class CubeMetastoreClient {
public TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> get(String fact, String storage)
throws HiveException, LensException {
// SUSPEND CHECKSTYLE CHECK DoubleCheckedLockingCheck
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
- if (get(storageTableName) == null) {
- synchronized (this) {
- if (get(storageTableName) == null) {
- Table storageTable = getTable(storageTableName);
- if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) {
- try {
- loadTimelinesFromTableProperties(fact, storage);
- } catch (Exception e) {
- // Ideally this should never come. But since we have another source,
- // let's piggyback on that for loading timeline
- log.error("Error while loading timelines from table properties.", e);
- loadTimelinesFromAllPartitions(fact, storage);
- }
- } else {
- loadTimelinesFromAllPartitions(fact, storage);
+ // Unique key for the timeline cache, based on storageName and fact.
+ String timeLineKey = (Storage.getPrefix(storage)+ fact).toLowerCase();
+ synchronized (this) {
+ if (get(timeLineKey) == null) {
+ loadTimeLines(fact, storage, timeLineKey);
+ }
+ log.info("timeline for {} is: {}", storage, get(timeLineKey));
+ // return the final value from memory
+ return get(timeLineKey);
+ // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck
+ }
+ }
+
+ /**
+ * @param fact
+ * @param storage
+ */
+ private void loadTimeLines(String fact, String storage, String timeLineKey) throws LensException, HiveException {
+ Set<String> uniqueStorageTables = new HashSet<>();
+ Map<UpdatePeriod, String> updatePeriodTableName = new HashMap<>();
+ for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
+ String storageTableName = getStorageTableName(fact, storage, updatePeriod);
+ updatePeriodTableName.put(updatePeriod, storageTableName);
+ Table storageTable = getTable(storageTableName);
+ if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) {
+ try {
+ loadTimelinesFromTableProperties(updatePeriod, storageTableName, timeLineKey);
+ } catch (Exception e) {
+ // Ideally this should never come. But since we have another source,
+ // let's piggyback on that for loading timeline
+ log.error("Error while loading timelines from table properties.", e);
+ ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey);
+ if (!uniqueStorageTables.contains(storageTableName)) {
+ uniqueStorageTables.add(storageTableName);
+ loadTimelinesFromAllPartitions(storageTableName, timeLineKey);
}
}
+ } else {
+ ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey);
+ if (!uniqueStorageTables.contains(storageTableName)) {
+ uniqueStorageTables.add(storageTableName);
+ loadTimelinesFromAllPartitions(storageTableName, timeLineKey);
+ }
}
- log.info("timeline for {} is: {}", storageTableName, get(storageTableName));
}
- // return the final value from memory
- return get(storageTableName);
- // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck
+ for (Map.Entry entry : updatePeriodTableName.entrySet()) {
+ alterTablePartitionCache(timeLineKey, (UpdatePeriod) entry.getKey(), (String) entry.getValue());
+ }
}
- private void loadTimelinesFromAllPartitions(String fact, String storage) throws HiveException, LensException {
+ private void ensureEntryForTimeLineKey(String fact, String storage, UpdatePeriod updatePeriod,
+ String storageTableName, String timeLineKey) throws LensException {
// Not found in table properties either, compute from all partitions of the fact-storage table.
// First make sure all combinations of update period and partition column have an entry even
// if no partitions exist
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
- log.info("loading from all partitions: {}", storageTableName);
- Table storageTable = getTable(storageTableName);
- if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(
- storage) != null) {
- for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
- for (String partCol : getTimePartColNamesOfTable(storageTable)) {
- ensureEntry(storageTableName, updatePeriod, partCol);
- }
+ if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(storage) != null) {
+ log.info("loading from all partitions: {}", storageTableName);
+ Table storageTable = getTable(storageTableName);
+ for (String partCol : getTimePartColNamesOfTable(storageTable)) {
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol);
}
}
+
+ }
+
+ private void loadTimelinesFromAllPartitions(String storageTableName, String timeLineKey)
+ throws HiveException, LensException {
// Then add all existing partitions for batch addition in respective timelines.
+ Table storageTable = getTable(storageTableName);
List<String> timeParts = getTimePartColNamesOfTable(storageTable);
List<FieldSchema> partCols = storageTable.getPartCols();
for (Partition partition : getPartitionsByFilter(storageTableName, null)) {
@@ -382,23 +431,17 @@ public class CubeMetastoreClient {
}
for (int i = 0; i < partCols.size(); i++) {
if (timeParts.contains(partCols.get(i).getName())) {
- addForBatchAddition(storageTableName, period, partCols.get(i).getName(), values.get(i));
+ addForBatchAddition(timeLineKey, storageTableName, period, partCols.get(i).getName(), values.get(i));
}
}
}
- // commit all batch addition for the storage table,
- // which will in-turn commit all batch additions in all it's timelines.
- commitAllBatchAdditions(storageTableName);
}
- private void loadTimelinesFromTableProperties(String fact, String storage) throws HiveException, LensException {
- // found in table properties, load from there.
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
+ private void loadTimelinesFromTableProperties(UpdatePeriod updatePeriod,
+ String storageTableName, String timeLineKey) throws HiveException, LensException {
log.info("loading from table properties: {}", storageTableName);
- for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
- for (String partCol : getTimePartColNamesOfTable(storageTableName)) {
- ensureEntry(storageTableName, updatePeriod, partCol).init(getTable(storageTableName));
- }
+ for (String partCol : getTimePartColNamesOfTable(storageTableName)) {
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol).init(getTable(storageTableName));
}
}
@@ -406,16 +449,17 @@ public class CubeMetastoreClient {
* Adds given partition(for storageTable, updatePeriod, partitionColum=partition) for batch addition in an
* appropriate timeline object. Ignore if partition is not valid.
*
- * @param storageTable storage table
+ * @param timeLineKey key for the timeLine map
+ * @param storageTableName hive table name
* @param updatePeriod update period
* @param partitionColumn partition column
* @param partition partition
*/
- public void addForBatchAddition(String storageTable, UpdatePeriod updatePeriod, String partitionColumn,
- String partition) {
+ public void addForBatchAddition(String timeLineKey, String storageTableName, UpdatePeriod updatePeriod,
+ String partitionColumn, String partition) {
try {
- ensureEntry(storageTable, updatePeriod, partitionColumn).addForBatchAddition(TimePartition.of(updatePeriod,
- partition));
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partitionColumn)
+ .addForBatchAddition(TimePartition.of(updatePeriod, partition));
} catch (LensException e) {
// to take care of the case where partition name is something like `latest`
log.error("Couldn't parse partition: {} with update period: {}, skipping.", partition, updatePeriod, e);
@@ -427,42 +471,18 @@ public class CubeMetastoreClient {
* <p></p>
* kind of like mkdir -p
*
- * @param storageTable storage table
+ * @param timeLineKey storage table
* @param updatePeriod update period
* @param partitionColumn partition column
* @return timeline if already exists, or puts a new timeline and returns.
*/
- public PartitionTimeline ensureEntry(String storageTable, UpdatePeriod updatePeriod, String partitionColumn) {
- if (get(storageTable) == null) {
- put(storageTable, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>());
- }
- if (get(storageTable).get(updatePeriod) == null) {
- get(storageTable).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>());
- }
- if (get(storageTable).get(updatePeriod).get(partitionColumn) == null) {
- get(storageTable).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get(
- CubeMetastoreClient.this, storageTable, updatePeriod, partitionColumn));
- }
- return get(storageTable).get(updatePeriod).get(partitionColumn);
- }
-
- /**
- * commit all batch addition for all its timelines.
- *
- * @param storageTable storage table
- * @throws HiveException
- * @throws LensException
- */
- public void commitAllBatchAdditions(String storageTable) throws HiveException, LensException {
- if (get(storageTable) != null) {
- for (UpdatePeriod updatePeriod : get(storageTable).keySet()) {
- for (String partCol : get(storageTable).get(updatePeriod).keySet()) {
- PartitionTimeline timeline = get(storageTable).get(updatePeriod).get(partCol);
- timeline.commitBatchAdditions();
- }
- }
- alterTablePartitionCache(storageTable);
- }
+ public PartitionTimeline ensureEntry(String timeLineKey, String storagTableName, UpdatePeriod updatePeriod,
+ String partitionColumn) {
+ return this
+ .computeIfAbsent(timeLineKey, s -> new TreeMap<>())
+ .computeIfAbsent(updatePeriod, k -> new CaseInsensitiveStringHashMap<>())
+ .computeIfAbsent(partitionColumn, c -> PartitionTimelineFactory.get(
+ CubeMetastoreClient.this, storagTableName, updatePeriod, c));
}
/** check partition existence in the appropriate timeline if it exists */
@@ -478,9 +498,11 @@ public class CubeMetastoreClient {
*/
public PartitionTimeline get(String fact, String storage, UpdatePeriod updatePeriod, String partCol)
throws HiveException, LensException {
- return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null && get(fact, storage).get(
- updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod).get(partCol) : null;
+ return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null
+ && get(fact, storage).get(updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod)
+ .get(partCol) : null;
}
+
/**
* returns the timeline corresponding to fact-storage table, updatePeriod, partCol. throws exception if not
* exists, which would most probably mean the combination is incorrect.
@@ -489,8 +511,8 @@ public class CubeMetastoreClient {
throws HiveException, LensException {
PartitionTimeline timeline = get(fact, storage, updatePeriod, partCol);
if (timeline == null) {
- throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(),
- fact, storage, updatePeriod, partCol);
+ throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), fact, storage, updatePeriod,
+ partCol);
}
return timeline;
}
@@ -519,8 +541,8 @@ public class CubeMetastoreClient {
boolean updated = false;
for (Map.Entry<String, Date> entry : timePartSpec.entrySet()) {
TimePartition part = TimePartition.of(updatePeriod, entry.getValue());
- if (!partitionExistsByFilter(cubeTableName, storageName, StorageConstants.getPartFilter(entry.getKey(),
- part.getDateString()))) {
+ if (!partitionExistsByFilter(cubeTableName, storageName, updatePeriod,
+ StorageConstants.getPartFilter(entry.getKey(), part.getDateString()))) {
get(cubeTableName, storageName, updatePeriod, entry.getKey()).drop(part);
updated = true;
}
@@ -565,10 +587,10 @@ public class CubeMetastoreClient {
Hive.closeCurrent();
}
- private void createOrAlterStorageHiveTable(Table parent, String storage, StorageTableDesc crtTblDesc)
+ private void createOrAlterStorageHiveTable(Table parent, String storageTableNamePrefix, StorageTableDesc crtTblDesc)
throws LensException {
try {
- Table tbl = getStorage(storage).getStorageTable(getClient(), parent, crtTblDesc);
+ Table tbl = Storage.getStorageTable(storageTableNamePrefix, getClient(), parent, crtTblDesc);
if (tableExists(tbl.getTableName())) {
// alter table
alterHiveTable(tbl.getTableName(), tbl);
@@ -730,7 +752,7 @@ public class CubeMetastoreClient {
* @param storageAggregatePeriods Aggregate periods for the storages
* @param weight Weight of the cube
* @param properties Properties of fact table
- * @param storageTableDescs Map of storage to its storage table description
+ * @param storageTableDescs Map of storage table prefix to its storage table description
* @throws LensException
*/
public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
@@ -808,7 +830,7 @@ public class CubeMetastoreClient {
* Create cube table defined and create all the corresponding storage tables
*
* @param cubeTable Can be fact or dimension table
- * @param storageTableDescs Map of storage to its storage table description
+ * @param storageTableDescs Map of storage tableName prefix to its storage table description
* @throws LensException
*/
public void createCubeTable(AbstractCubeTable cubeTable, Map<String, StorageTableDesc> storageTableDescs)
@@ -836,14 +858,17 @@ public class CubeMetastoreClient {
* @param fact The CubeFactTable
* @param storage The storage
* @param updatePeriods Update periods of the fact on the storage
- * @param storageTableDesc The storage table description
+ * @param storageTableDescs The storage table description
* @throws LensException
*/
public void addStorage(CubeFactTable fact, String storage, Set<UpdatePeriod> updatePeriods,
- StorageTableDesc storageTableDesc) throws LensException {
- fact.addStorage(storage, updatePeriods);
- createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT),
- storage, storageTableDesc);
+ Map<String, StorageTableDesc> storageTableDescs, Map<UpdatePeriod, String> updatePeriodStoragePrefix)
+ throws LensException {
+ fact.addStorage(storage, updatePeriods, updatePeriodStoragePrefix);
+ for (Map.Entry entry : storageTableDescs.entrySet()) {
+ createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT),
+ (String) entry.getKey(), (StorageTableDesc) entry.getValue());
+ }
alterCubeTable(fact.getName(), getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), fact);
updateFactCache(fact.getName());
}
@@ -860,8 +885,8 @@ public class CubeMetastoreClient {
public void addStorage(CubeDimensionTable dim, String storage, UpdatePeriod dumpPeriod,
StorageTableDesc storageTableDesc) throws LensException {
dim.alterSnapshotDumpPeriod(storage, dumpPeriod);
- createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE),
- storage, storageTableDesc);
+ createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), storage,
+ storageTableDesc);
alterCubeTable(dim.getName(), getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), dim);
updateDimCache(dim.getName());
}
@@ -896,10 +921,19 @@ public class CubeMetastoreClient {
return partsAdded;
}
+ /**
+ * @param factOrDimTable
+ * @param storageName
+ * @param updatePeriod
+ * @param storagePartitionDescs
+ * @param type
+ * @return
+ * @throws HiveException
+ * @throws LensException
+ */
private List<Partition> addPartitions(String factOrDimTable, String storageName, UpdatePeriod updatePeriod,
List<StoragePartitionDesc> storagePartitionDescs, CubeTableType type) throws HiveException, LensException {
- String storageTableName = getStorageTableName(factOrDimTable.trim(),
- Storage.getPrefix(storageName.trim())).toLowerCase();
+ String storageTableName = getStorageTableName(factOrDimTable, storageName, updatePeriod);
if (type == CubeTableType.DIM_TABLE) {
// Adding partition in dimension table.
Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap();
@@ -910,7 +944,7 @@ public class CubeMetastoreClient {
}
List<Partition> partsAdded =
getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, storagePartitionDescs,
- latestInfos);
+ latestInfos, storageTableName);
ListIterator<Partition> iter = partsAdded.listIterator();
while (iter.hasNext()) {
if (iter.next().getSpec().values().contains(StorageConstants.LATEST_PARTITION_VALUE)) {
@@ -928,10 +962,11 @@ public class CubeMetastoreClient {
// Adding partition in fact table.
if (storagePartitionDescs.size() > 0) {
partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod,
- storagePartitionDescs, null);
+ storagePartitionDescs, null, storageTableName);
}
// update hive table
- alterTablePartitionCache(getStorageTableName(factOrDimTable, Storage.getPrefix(storageName)));
+ alterTablePartitionCache((Storage.getPrefix(storageName) + factOrDimTable).toLowerCase(), updatePeriod,
+ storageTableName);
return partsAdded;
} else {
throw new LensException("Can't add partitions to anything other than fact or dimtable");
@@ -1018,20 +1053,20 @@ public class CubeMetastoreClient {
}
/**
- * store back all timelines of given storage table to table properties
+ * store back all timelines of given storage to table properties
*
- * @param storageTableName storage table name
+ * @param timeLineKey key for the time line
+ * @param storageTableName Storage table name
* @throws HiveException
*/
- private void alterTablePartitionCache(String storageTableName) throws HiveException, LensException {
+ private void alterTablePartitionCache(String timeLineKey, UpdatePeriod updatePeriod, String storageTableName)
+ throws HiveException, LensException {
Table table = getTable(storageTableName);
Map<String, String> params = table.getParameters();
- if (partitionTimelineCache.get(storageTableName) != null) {
- for (UpdatePeriod updatePeriod : partitionTimelineCache.get(storageTableName).keySet()) {
- for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(storageTableName)
- .get(updatePeriod).entrySet()) {
- entry.getValue().updateTableParams(table);
- }
+ if (partitionTimelineCache.get(timeLineKey) != null) {
+ for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(timeLineKey).get(updatePeriod)
+ .entrySet()) {
+ entry.getValue().updateTableParams(table);
}
params.put(getPartitionTimelineCachePresenceKey(), "true");
alterHiveTable(storageTableName, table);
@@ -1173,8 +1208,7 @@ public class CubeMetastoreClient {
*/
public void dropPartition(String cubeTableName, String storageName, Map<String, Date> timePartSpec,
Map<String, String> nonTimePartSpec, UpdatePeriod updatePeriod) throws HiveException, LensException {
- String storageTableName = getStorageTableName(cubeTableName.trim(),
- Storage.getPrefix(storageName.trim())).toLowerCase();
+ String storageTableName = getStorageTableName(cubeTableName.trim(), storageName, updatePeriod);
Table hiveTable = getHiveTable(storageTableName);
List<FieldSchema> partCols = hiveTable.getPartCols();
List<String> partColNames = new ArrayList<>(partCols.size());
@@ -1244,7 +1278,8 @@ public class CubeMetastoreClient {
// dropping fact partition
getStorage(storageName).dropPartition(getClient(), storageTableName, partVals, null, null);
if (partitionTimelineCache.updateForDeletion(cubeTableName, storageName, updatePeriod, timePartSpec)) {
- this.alterTablePartitionCache(storageTableName);
+ this.alterTablePartitionCache((Storage.getPrefix(storageName) + cubeTableName).toLowerCase(), updatePeriod,
+ storageTableName);
}
}
}
@@ -1277,7 +1312,7 @@ public class CubeMetastoreClient {
public boolean factPartitionExists(String factName, String storageName, UpdatePeriod updatePeriod,
Map<String, Date> partitionTimestamp,
Map<String, String> partSpec) throws HiveException, LensException {
- String storageTableName = getFactOrDimtableStorageTableName(factName, storageName);
+ String storageTableName = getStorageTableName(factName, storageName, updatePeriod);
return partitionExists(storageTableName, updatePeriod, partitionTimestamp, partSpec);
}
@@ -1286,9 +1321,9 @@ public class CubeMetastoreClient {
return partitionExists(storageTableName, getPartitionSpec(updatePeriod, partitionTimestamps));
}
- public boolean partitionExistsByFilter(String cubeTableName, String storageName, String filter) throws LensException {
- return partitionExistsByFilter(getStorageTableName(cubeTableName, Storage.getPrefix(storageName)),
- filter);
+ public boolean partitionExistsByFilter(String cubeTableName, String storageName, UpdatePeriod updatePeriod,
+ String filter) throws LensException {
+ return partitionExistsByFilter(getStorageTableName(cubeTableName, storageName, updatePeriod), filter);
}
public boolean partitionExistsByFilter(String storageTableName, String filter) throws LensException {
@@ -1354,7 +1389,7 @@ public class CubeMetastoreClient {
boolean latestPartitionExists(String factOrDimTblName, String storageName, String latestPartCol)
throws HiveException, LensException {
- String storageTableName = getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName));
+ String storageTableName = MetastoreUtil.getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName));
if (isDimensionTable(factOrDimTblName)) {
return dimTableLatestPartitionExists(storageTableName);
} else {
@@ -2225,18 +2260,30 @@ public class CubeMetastoreClient {
*/
public void dropStorageFromFact(String factName, String storage) throws LensException {
CubeFactTable cft = getFactTable(factName);
+ dropHiveTablesForStorage(factName, storage);
cft.dropStorage(storage);
- dropHiveTable(getFactOrDimtableStorageTableName(factName, storage));
alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft);
updateFactCache(factName);
}
+ private void dropHiveTablesForStorage(String factName, String storage) throws LensException{
+ CubeFactTable cft = getFactTable(factName);
+ Set<String> droppedTables = new HashSet<>();
+ for (Map.Entry updatePeriodEntry : cft.getStoragePrefixUpdatePeriodMap().get(storage).entrySet()) {
+ UpdatePeriod updatePeriod = (UpdatePeriod) updatePeriodEntry.getKey();
+ String storageTableName = getStorageTableName(factName, storage, updatePeriod);
+ if (!droppedTables.contains(storageTableName)) {
+ dropHiveTable(storageTableName);
+ }
+ droppedTables.add(storageTableName);
+ }
+ }
// updateFact will be false when fact is fully dropped
private void dropStorageFromFact(String factName, String storage, boolean updateFact)
throws LensException {
- CubeFactTable cft = getFactTable(factName);
- dropHiveTable(getFactOrDimtableStorageTableName(factName, storage));
+ dropHiveTablesForStorage(factName, storage);
if (updateFact) {
+ CubeFactTable cft = getFactTable(factName);
cft.dropStorage(storage);
alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft);
updateFactCache(factName);
@@ -2432,4 +2479,22 @@ public class CubeMetastoreClient {
Date now = new Date();
return isStorageTableCandidateForRange(storageTableName, resolveDate(fromDate, now), resolveDate(toDate, now));
}
+
+ private String getStorageTablePrefixFromStorage(String factOrDimTableName, String storage, UpdatePeriod updatePeriod)
+ throws LensException {
+ if (updatePeriod == null) {
+ return storage;
+ }
+ if (isFactTable(factOrDimTableName)) {
+ return getFactTable(factOrDimTableName).getTablePrefix(storage, updatePeriod);
+ } else {
+ return storage;
+ }
+ }
+
+ public String getStorageTableName(String factOrDimTableName, String storage, UpdatePeriod updatePeriod)
+ throws LensException {
+ return MetastoreUtil.getFactOrDimtableStorageTableName(factOrDimTableName,
+ getStorageTablePrefixFromStorage(factOrDimTableName, storage, updatePeriod));
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java
index 7717081..d10d72e 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.lens.cube.error.LensCubeErrorCode;
import org.apache.lens.server.api.error.LensException;
@@ -305,11 +306,11 @@ public final class DateUtil {
switch (interval) {
case SECONDLY:
case CONTINUOUS:
- return getMilliSecondCoveringInfo(from, to, 1000);
+ return getMilliSecondCoveringInfo(from, to, 1000, interval);
case MINUTELY:
case HOURLY:
case DAILY:
- return getMilliSecondCoveringInfo(from, to, interval.weight());
+ return getMilliSecondCoveringInfo(from, to, interval.weight(), interval);
case WEEKLY:
return getWeeklyCoveringInfo(from, to);
case MONTHLY:
@@ -323,18 +324,25 @@ public final class DateUtil {
}
}
- private static CoveringInfo getMilliSecondCoveringInfo(Date from, Date to, long millisInInterval) {
+ private static CoveringInfo getMilliSecondCoveringInfo(Date from, Date to, long millisInInterval, UpdatePeriod interval) {
long diff = to.getTime() - from.getTime();
- return new CoveringInfo((int) (diff / millisInInterval), diff % millisInInterval == 0);
+ return new CoveringInfo((int) (diff / millisInInterval),
+ Stream.of(from, to).allMatch(a->interval.truncate(a).equals(a)));
+ // start date and end date should lie on boundaries.
}
+ /**
+ * Whether the range [from,to) is coverable by intervals
+ * @param from from time
+ * @param to to time
+ * @param intervals intervals to check
+ * @return true if any of the intervals can completely cover the range
+ */
static boolean isCoverableBy(Date from, Date to, Set<UpdatePeriod> intervals) {
- for (UpdatePeriod period : intervals) {
- if (getCoveringInfo(from, to, period).isCoverable()) {
- return true;
- }
- }
- return false;
+ return intervals.stream().anyMatch(period->isCoverableBy(from, to, period));
+ }
+ private static boolean isCoverableBy(Date from, Date to, UpdatePeriod period) {
+ return getCoveringInfo(from, to, period).isCoverable();
}
public static int getTimeDiff(Date fromDate, Date toDate, UpdatePeriod updatePeriod) {
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
index 1694b80..b90b569 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java
@@ -64,7 +64,10 @@ public class FactPartition implements Comparable<FactPartition> {
this.storageTables.addAll(storageTables);
}
}
-
+ public FactPartition withoutContaining() {
+ return new FactPartition(this.getPartCol(), this.getPartSpec(), this.getPeriod(), null, this
+ .getPartFormat(), this.getStorageTables());
+ }
public FactPartition(String partCol, TimePartition timePartition) {
this(partCol, timePartition, null, null);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
index 53cf8af..57d4502 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
@@ -590,4 +590,10 @@ public class MetastoreUtil {
}
return copy;
}
+
+ public static String getUpdatePeriodStoragePrefixKey(String factTableName , String storageName, String updatePeriod) {
+ return MetastoreUtil.getFactKeyPrefix(factTableName) + "." + storageName + "." + updatePeriod;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
index cd9f705..936add4 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
@@ -124,14 +124,18 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
/**
* Get the storage table descriptor for the given parent table.
*
+ * @param storageTableNamePrefix Storage table prefix based on update period
* @param client The metastore client
* @param parent Is either Fact or Dimension table
* @param crtTbl Create table info
* @return Table describing the storage table
* @throws HiveException
*/
- public Table getStorageTable(Hive client, Table parent, StorageTableDesc crtTbl) throws HiveException {
- String storageTableName = MetastoreUtil.getStorageTableName(parent.getTableName(), this.getPrefix());
+ public static Table getStorageTable(String storageTableNamePrefix, Hive client, Table parent, StorageTableDesc crtTbl)
+ throws HiveException {
+ // Change it to the appropriate storage table name.
+ String storageTableName = MetastoreUtil
+ .getStorageTableName(parent.getTableName(), Storage.getPrefix(storageTableNamePrefix));
Table tbl = client.getTable(storageTableName, false);
if (tbl == null) {
tbl = client.newTable(storageTableName);
@@ -235,21 +239,6 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
}
/**
- * Add single partition to storage. Just calls #addPartitions.
- * @param client
- * @param addPartitionDesc
- * @param latestInfo
- * @throws HiveException
- */
- public List<Partition> addPartition(Hive client, StoragePartitionDesc addPartitionDesc, LatestInfo latestInfo)
- throws HiveException {
- Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap();
- latestInfos.put(addPartitionDesc.getNonTimePartSpec(), latestInfo);
- return addPartitions(client, addPartitionDesc.getCubeTableName(), addPartitionDesc.getUpdatePeriod(),
- Collections.singletonList(addPartitionDesc), latestInfos);
- }
-
- /**
* Add given partitions in the underlying hive table and update latest partition links
*
* @param client hive client instance
@@ -262,12 +251,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
*/
public List<Partition> addPartitions(Hive client, String factOrDimTable, UpdatePeriod updatePeriod,
List<StoragePartitionDesc> storagePartitionDescs,
- Map<Map<String, String>, LatestInfo> latestInfos) throws HiveException {
+ Map<Map<String, String>, LatestInfo> latestInfos, String tableName) throws HiveException {
preAddPartitions(storagePartitionDescs);
Map<Map<String, String>, Map<String, Integer>> latestPartIndexForPartCols = Maps.newHashMap();
boolean success = false;
try {
- String tableName = MetastoreUtil.getStorageTableName(factOrDimTable, this.getPrefix());
String dbName = SessionState.get().getCurrentDatabase();
AddPartitionDesc addParts = new AddPartitionDesc(dbName, tableName, true);
Table storageTbl = client.getTable(dbName, tableName);
@@ -383,11 +371,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
* @throws InvalidOperationException
* @throws HiveException
*/
- public void updatePartitions(Hive client, String fact, List<Partition> partitions)
+ public void updatePartitions(String storageTable, Hive client, String fact, List<Partition> partitions)
throws InvalidOperationException, HiveException {
boolean success = false;
try {
- client.alterPartitions(MetastoreUtil.getFactOrDimtableStorageTableName(fact, getName()), partitions, null);
+ client.alterPartitions(storageTable, partitions, null);
success = true;
} finally {
if (success) {
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java
index bf6cc5c..5bdbf74 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java
@@ -22,6 +22,7 @@ import static org.apache.lens.cube.metadata.DateUtil.ABSDATE_PARSER;
import java.util.Calendar;
import java.util.Date;
+import java.util.Set;
import java.util.TreeSet;
import org.apache.lens.cube.error.LensCubeErrorCode;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import lombok.Builder;
import lombok.Data;
import lombok.Getter;
@@ -48,10 +50,24 @@ public class TimeRange {
private ASTNode parent;
private int childIndex;
- public boolean isCoverableBy(TreeSet<UpdatePeriod> updatePeriods) {
+ public boolean isCoverableBy(Set<UpdatePeriod> updatePeriods) {
return DateUtil.isCoverableBy(fromDate, toDate, updatePeriods);
}
+ /**
+ * Truncate time range using the update period.
+ * The lower value of the truncated time range is the smallest date value equal to or larger than original
+ * time range's lower value which lies at the update period's boundary. Similarly for higher value.
+ * @param updatePeriod Update period to truncate time range with
+ * @return truncated time range
+ * @throws LensException If the truncated time range is invalid.
+ */
+ public TimeRange truncate(UpdatePeriod updatePeriod) throws LensException {
+ TimeRange timeRange = new TimeRangeBuilder().partitionColumn(partitionColumn)
+ .fromDate(updatePeriod.getCeilDate(fromDate)).toDate(updatePeriod.getFloorDate(toDate)).build();
+ timeRange.validate();
+ return timeRange;
+ }
public static class TimeRangeBuilder {
private final TimeRange range;
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java
index 8681e90..3916a48 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java
@@ -19,7 +19,12 @@
package org.apache.lens.cube.parse;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toMap;
+
import java.util.*;
+import java.util.stream.Collectors;
import org.apache.lens.cube.metadata.FactPartition;
import org.apache.lens.server.api.error.LensException;
@@ -33,14 +38,13 @@ import com.google.common.collect.Sets;
* Collapses the time range filters using IN operators
*/
public class AbridgedTimeRangeWriter implements TimeRangeWriter {
- //TODO: minimize use of String, use StringBuilders
/**
* Return IN clause for the partitions selected in the cube query
*
- * @param cubeQueryContext
- * @param tableName
- * @param parts
+ * @param cubeQueryContext cube query context
+ * @param tableName table name
+ * @param parts partitions
* @return
* @throws LensException
*/
@@ -80,7 +84,7 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter {
for (FactPartition factPartition : parts) {
String filter = TimeRangeUtils.getTimeRangePartitionFilter(factPartition, cubeQueryContext, tableName);
if (filter.contains("AND")) {
- allTimeRangeFilters.add(new StringBuilder("(").append(filter).append(")").toString());
+ allTimeRangeFilters.add("(" + filter + ")");
} else {
extractColumnAndCondition(filter, partFilterMap);
}
@@ -89,7 +93,7 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter {
List<String> inClauses = new ArrayList<String>(partFilterMap.size());
for (String column : partFilterMap.keySet()) {
String clause =
- new StringBuilder("(").append(StringUtils.join(partFilterMap.get(column), ",")).append(")").toString();
+ "(" + StringUtils.join(partFilterMap.get(column), ",") + ")";
inClauses.add(column + " IN " + clause);
}
@@ -120,29 +124,17 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter {
private Map<Set<FactPartition>, Set<FactPartition>> groupPartitions(Collection<FactPartition> parts) {
Map<FactPartition, Set<FactPartition>> partitionSetMap = new HashMap<FactPartition, Set<FactPartition>>();
for (FactPartition part : parts) {
- FactPartition key = part.getContainingPart();
- FactPartition part2 = new FactPartition(part.getPartCol(), part.getPartSpec(), part.getPeriod(), null, part
- .getPartFormat(), part.getStorageTables());
- if (partitionSetMap.get(key) == null) {
- partitionSetMap.put(key, Sets.<FactPartition>newTreeSet());
- }
- partitionSetMap.get(key).add(part2);
+ partitionSetMap.computeIfAbsent(part.getContainingPart(), k -> Sets.newTreeSet()).add(part.withoutContaining());
}
Map<Set<FactPartition>, Set<FactPartition>> setSetOppositeMap = Maps.newHashMap();
for (Map.Entry<FactPartition, Set<FactPartition>> entry : partitionSetMap.entrySet()) {
- if (setSetOppositeMap.get(entry.getValue()) == null) {
- setSetOppositeMap.put(entry.getValue(), Sets.<FactPartition>newTreeSet());
- }
+ setSetOppositeMap.computeIfAbsent(entry.getValue(), k -> Sets.newTreeSet());
if (entry.getKey() != null) {
setSetOppositeMap.get(entry.getValue()).add(entry.getKey());
}
}
-
- Map<Set<FactPartition>, Set<FactPartition>> setSetMap = Maps.newHashMap();
- for (Map.Entry<Set<FactPartition>, Set<FactPartition>> entry : setSetOppositeMap.entrySet()) {
- setSetMap.put(entry.getValue(), entry.getKey());
- }
- return setSetMap;
+ // inverse again
+ return setSetOppositeMap.entrySet().stream().collect(toMap(Map.Entry::getValue, Map.Entry::getKey));
}
// This takes the output of filter generated by TimeRangeUtils.getTimeRangePartitionFilter
@@ -156,13 +148,6 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter {
String column = subTokens[0].trim();
String filterValue = subTokens[1].trim();
- List<String> filterValues = partFilterMap.get(column);
-
- if (filterValues == null) {
- filterValues = new ArrayList<String>();
- partFilterMap.put(column, filterValues);
- }
-
- filterValues.add(filterValue);
+ partFilterMap.computeIfAbsent(column, k -> new ArrayList<>()).add(filterValue);
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java
index c8b8129..bd77498 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java
@@ -92,7 +92,7 @@ public class BetweenTimeRangeWriter implements TimeRangeWriter {
}
String partCol = start.getPartCol();
- if (cubeQueryContext != null && !cubeQueryContext.shouldReplaceTimeDimWithPart()) {
+ if (!cubeQueryContext.shouldReplaceTimeDimWithPart()) {
partCol = cubeQueryContext.getTimeDimOfPartitionColumn(partCol);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
index c36ce70..0b7d400 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,11 +34,6 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CandidateCoveringSetsResolver implements ContextRewriter {
- private final Configuration conf;
- public CandidateCoveringSetsResolver(Configuration conf) {
- this.conf = conf;
- }
-
@Override
public void rewriteContext(CubeQueryContext cubeql) throws LensException {
@@ -99,8 +94,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
private void updateFinalCandidates(List<List<Candidate>> joinCandidates, CubeQueryContext cubeql) {
List<Candidate> finalCandidates = new ArrayList<>();
- for (Iterator<List<Candidate>> itr = joinCandidates.iterator(); itr.hasNext();) {
- List<Candidate> joinCandidate = itr.next();
+ for (List<Candidate> joinCandidate : joinCandidates) {
if (joinCandidate.size() == 1) {
finalCandidates.add(joinCandidate.iterator().next());
} else {
@@ -112,8 +106,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
}
private boolean isCandidateCoveringTimeRanges(UnionCandidate uc, List<TimeRange> ranges) {
- for (Iterator<TimeRange> itr = ranges.iterator(); itr.hasNext();) {
- TimeRange range = itr.next();
+ for (TimeRange range : ranges) {
if (!CandidateUtil.isTimeRangeCovered(uc.getChildren(), range.getFromDate(), range.getToDate())) {
return false;
}
@@ -134,7 +127,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
private List<Candidate> resolveTimeRangeCoveringFactSet(CubeQueryContext cubeql,
Set<QueriedPhraseContext> queriedMsrs, List<QueriedPhraseContext> qpcList) throws LensException {
// All Candidates
- List<Candidate> allCandidates = new ArrayList<Candidate>(cubeql.getCandidates());
+ List<Candidate> allCandidates = new ArrayList<>(cubeql.getCandidates());
// Partially valid candidates
List<Candidate> allCandidatesPartiallyValid = new ArrayList<>();
List<Candidate> candidateSet = new ArrayList<>();
@@ -144,7 +137,6 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
StorageCandidate sc = (StorageCandidate) cand;
if (CandidateUtil.isValidForTimeRanges(sc, cubeql.getTimeRanges())) {
candidateSet.add(CandidateUtil.cloneStorageCandidate(sc));
- continue;
} else if (CandidateUtil.isPartiallyValidForTimeRanges(sc, cubeql.getTimeRanges())) {
allCandidatesPartiallyValid.add(CandidateUtil.cloneStorageCandidate(sc));
} else {
@@ -157,9 +149,9 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
}
// Get all covering fact sets
List<UnionCandidate> unionCoveringSet =
- getCombinations(new ArrayList<Candidate>(allCandidatesPartiallyValid), cubeql);
+ getCombinations(new ArrayList<>(allCandidatesPartiallyValid), cubeql);
// Sort the Collection based on no of elements
- Collections.sort(unionCoveringSet, new CandidateUtil.ChildrenSizeBasedCandidateComparator<UnionCandidate>());
+ unionCoveringSet.sort(new CandidateUtil.ChildrenSizeBasedCandidateComparator<UnionCandidate>());
// prune non covering sets
pruneUnionCandidatesNotCoveringAllRanges(unionCoveringSet, cubeql);
// prune candidate set which doesn't contain any common measure i
@@ -218,14 +210,13 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
}
}
- public List<UnionCandidate> getCombinations(final List<Candidate> candidates, CubeQueryContext cubeql) {
- int aliasCounter = 0;
- List<UnionCandidate> combinations = new LinkedList<UnionCandidate>();
+ private List<UnionCandidate> getCombinations(final List<Candidate> candidates, CubeQueryContext cubeql) {
+ List<UnionCandidate> combinations = new LinkedList<>();
int size = candidates.size();
int threshold = Double.valueOf(Math.pow(2, size)).intValue() - 1;
for (int i = 1; i <= threshold; ++i) {
- LinkedList<Candidate> individualCombinationList = new LinkedList<Candidate>();
+ LinkedList<Candidate> individualCombinationList = new LinkedList<>();
int count = size - 1;
int clonedI = i;
while (count >= 0) {
@@ -249,7 +240,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
boolean evaluable = false;
Candidate uc = i.next();
for (QueriedPhraseContext msr : msrs) {
- evaluable = isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql) ? true : false;
+ evaluable = isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql);
if (!evaluable) {
break;
}
@@ -265,18 +256,18 @@ public class CandidateCoveringSetsResolver implements ContextRewriter {
// Sets that contain all measures or no measures are removed from iteration.
// find other facts
for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext();) {
- Candidate uc = i.next();
+ Candidate candidate = i.next();
i.remove();
// find the remaining measures in other facts
if (i.hasNext()) {
Set<QueriedPhraseContext> remainingMsrs = new HashSet<>(msrs);
- Set<QueriedPhraseContext> coveredMsrs = CandidateUtil.coveredMeasures(uc, msrs, cubeql);
+ Set<QueriedPhraseContext> coveredMsrs = CandidateUtil.coveredMeasures(candidate, msrs, cubeql);
remainingMsrs.removeAll(coveredMsrs);
List<List<Candidate>> coveringSets = resolveJoinCandidates(ucSet, remainingMsrs, cubeql);
if (!coveringSets.isEmpty()) {
for (List<Candidate> candSet : coveringSets) {
- candSet.add(uc);
+ candSet.add(candidate);
msrCoveringSets.add(candSet);
}
} else {
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
index 5863c1c..168dcc6 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java
@@ -73,6 +73,5 @@ public interface CandidateTable {
/**
* Get partitions queried
*/
- //TODO union: Name changed
Set<?> getParticipatingPartitions();
}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
index c7f2047..6cb18e6 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,9 +18,13 @@
*/
package org.apache.lens.cube.parse;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.partition;
+import static java.util.stream.Collectors.toSet;
import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*;
import java.util.*;
+import java.util.stream.Stream;
import org.apache.lens.cube.metadata.TimeRange;
@@ -52,12 +56,8 @@ public class CandidateTablePruneCause {
"present in any table",
};
} else {
- List<List<String>> columnSets = new ArrayList<List<String>>();
- for (CandidateTablePruneCause cause : causes) {
- columnSets.add(cause.getMissingColumns());
- }
return new String[]{
- "Column Sets: " + columnSets,
+ "Column Sets: " + causes.stream().map(CandidateTablePruneCause::getMissingColumns).collect(toSet()),
"queriable together",
};
}
@@ -87,12 +87,9 @@ public class CandidateTablePruneCause {
STORAGE_NOT_AVAILABLE_IN_RANGE("No storages available for all of these time ranges: %s") {
@Override
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- Set<TimeRange> allRanges = Sets.newHashSet();
- for (CandidateTablePruneCause cause : causes) {
- allRanges.addAll(cause.getInvalidRanges());
- }
return new Object[]{
- allRanges.toString(),
+ causes.stream().map(CandidateTablePruneCause::getInvalidRanges).flatMap(Collection::stream)
+ .collect(toSet()).toString(),
};
}
},
@@ -108,11 +105,10 @@ public class CandidateTablePruneCause {
// expression is not evaluable in the candidate
EXPRESSION_NOT_EVALUABLE("%s expressions not evaluable") {
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- List<String> columns = new ArrayList<String>();
- for (CandidateTablePruneCause cause : causes) {
- columns.addAll(cause.getMissingExpressions());
- }
- return new String[]{columns.toString()};
+ return new String[]{
+ causes.stream().map(CandidateTablePruneCause::getMissingExpressions).flatMap(Collection::stream)
+ .collect(toSet()).toString()
+ };
}
},
// column not valid in cube table. Commented the below line as it's not being used in master.
@@ -126,12 +122,8 @@ public class CandidateTablePruneCause {
"present in any table",
};
} else {
- List<List<String>> columnSets = new ArrayList<List<String>>();
- for (CandidateTablePruneCause cause : causes) {
- columnSets.add(cause.getMissingColumns());
- }
return new String[]{
- "Column Sets: " + columnSets,
+ "Column Sets: " + causes.stream().map(CandidateTablePruneCause::getMissingColumns).collect(toSet()),
"queriable together",
};
}
@@ -146,17 +138,13 @@ public class CandidateTablePruneCause {
TIMEDIM_NOT_SUPPORTED("Queried data not available for time dimensions: %s") {
@Override
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- Set<String> dims = Sets.newHashSet();
- for(CandidateTablePruneCause cause: causes){
- dims.addAll(cause.getUnsupportedTimeDims());
- }
return new Object[]{
- dims.toString(),
+ causes.stream().map(CandidateTablePruneCause::getUnsupportedTimeDims).flatMap(Collection::stream)
+ .collect(toSet()).toString(),
};
}
},
- //Commented as its not used anymore.
- //NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE("No fact update periods for given range"),
+ NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE("No fact update periods for given range"),
// no candidate update periods, update period cause will have why each
// update period is not a candidate
@@ -164,44 +152,37 @@ public class CandidateTablePruneCause {
NO_COLUMN_PART_OF_A_JOIN_PATH("No column part of a join path. Join columns: [%s]") {
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- List<String> columns = new ArrayList<String>();
- for (CandidateTablePruneCause cause : causes) {
- columns.addAll(cause.getJoinColumns());
- }
- return new String[]{columns.toString()};
+ return new String[]{
+ causes.stream().map(CandidateTablePruneCause::getJoinColumns).flatMap(Collection::stream)
+ .collect(toSet()).toString()
+ };
}
},
// cube table is an aggregated fact and queried column is not under default
// aggregate
MISSING_DEFAULT_AGGREGATE("Columns: [%s] are missing default aggregate") {
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- List<String> columns = new ArrayList<String>();
- for (CandidateTablePruneCause cause : causes) {
- columns.addAll(cause.getColumnsMissingDefaultAggregate());
- }
- return new String[]{columns.toString()};
+ return new String[]{
+ causes.stream().map(CandidateTablePruneCause::getColumnsMissingDefaultAggregate).flatMap(Collection::stream)
+ .collect(toSet()).toString()
+ };
}
},
// missing partitions for cube table
MISSING_PARTITIONS("Missing partitions for the cube table: %s") {
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- Set<Set<String>> missingPartitions = Sets.newHashSet();
- for (CandidateTablePruneCause cause : causes) {
- missingPartitions.add(cause.getMissingPartitions());
- }
- return new String[]{missingPartitions.toString()};
+ return new String[]{
+ causes.stream().map(CandidateTablePruneCause::getMissingPartitions).collect(toSet()).toString()
+ };
}
},
// incomplete data in the fact
- INCOMPLETE_PARTITION("Data is incomplete. Details : %s") {
+ INCOMPLETE_PARTITION("Data for the requested metrics is only partially complete. Partially complete metrics are:"
+ + " %s. Please try again later or rerun after removing incomplete metrics") {
Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) {
- Set<Map<String, Map<String, Float>>> incompletePartitions = Sets.newHashSet();
- for (CandidateTablePruneCause cause : causes) {
- if (cause.getIncompletePartitions() != null) {
- incompletePartitions.add(cause.getIncompletePartitions());
- }
- }
- return new String[]{incompletePartitions.toString()};
+ return new String[]{
+ causes.stream().map(CandidateTablePruneCause::getIncompletePartitions).collect(toSet()).toString()
+ };
}
};
@@ -227,8 +208,9 @@ public class CandidateTablePruneCause {
public enum SkipUpdatePeriodCode {
// invalid update period
INVALID,
- // Query max interval is more than update period
- QUERY_INTERVAL_BIGGER
+ //this update period is greater than the Query max interval as provided by user with lens.cube.query.max.interval
+ UPDATE_PERIOD_BIGGER_THAN_MAX,
+ QUERY_INTERVAL_SMALLER_THAN_UPDATE_PERIOD
}
// Used for Test cases only.
@@ -244,11 +226,11 @@ public class CandidateTablePruneCause {
// populated only incase of missing update periods cause
private List<String> missingUpdatePeriods;
// populated in case of missing columns
- private List<String> missingColumns;
+ private Set<String> missingColumns;
// populated in case of expressions not evaluable
private List<String> missingExpressions;
// populated in case of no column part of a join path
- private List<String> joinColumns;
+ private Collection<String> joinColumns;
// the columns that are missing default aggregate. only set in case of MISSING_DEFAULT_AGGREGATE
private List<String> columnsMissingDefaultAggregate;
// if a time dim is not supported by the fact. Would be set if and only if
@@ -268,54 +250,46 @@ public class CandidateTablePruneCause {
}
// Different static constructors for different causes.
- public static CandidateTablePruneCause storageNotAvailableInRange(List<TimeRange> ranges) {
+ static CandidateTablePruneCause storageNotAvailableInRange(List<TimeRange> ranges) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(STORAGE_NOT_AVAILABLE_IN_RANGE);
cause.invalidRanges = ranges;
return cause;
}
- public static CandidateTablePruneCause timeDimNotSupported(Set<String> unsupportedTimeDims) {
+ static CandidateTablePruneCause timeDimNotSupported(Set<String> unsupportedTimeDims) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(TIMEDIM_NOT_SUPPORTED);
cause.unsupportedTimeDims = unsupportedTimeDims;
return cause;
}
- public static CandidateTablePruneCause columnNotFound(CandidateTablePruneCode pruneCode,
- Collection<String>... missingColumns) {
- List<String> colList = new ArrayList<String>();
- for (Collection<String> missing : missingColumns) {
- colList.addAll(missing);
- }
- CandidateTablePruneCause cause = new CandidateTablePruneCause(pruneCode);
- cause.setMissingColumns(colList);
+ static CandidateTablePruneCause columnNotFound(Collection<String> missingColumns) {
+ CandidateTablePruneCause cause = new CandidateTablePruneCause(COLUMN_NOT_FOUND);
+ cause.setMissingColumns(Sets.newHashSet(missingColumns));
+ return cause;
+ }
+ static CandidateTablePruneCause denormColumnNotFound(Collection<String> missingColumns) {
+ CandidateTablePruneCause cause = new CandidateTablePruneCause(DENORM_COLUMN_NOT_FOUND);
+ cause.setMissingColumns(Sets.newHashSet(missingColumns));
return cause;
}
- public static CandidateTablePruneCause columnNotFound(CandidateTablePruneCode pruneCode, String... columns) {
- List<String> colList = new ArrayList<String>();
- for (String column : columns) {
- colList.add(column);
- }
- return columnNotFound(pruneCode, colList);
+ static CandidateTablePruneCause columnNotFound(String... columns) {
+ return columnNotFound(newArrayList(columns));
}
- public static CandidateTablePruneCause expressionNotEvaluable(String... exprs) {
- List<String> colList = new ArrayList<String>();
- for (String column : exprs) {
- colList.add(column);
- }
+ static CandidateTablePruneCause expressionNotEvaluable(String... exprs) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(EXPRESSION_NOT_EVALUABLE);
- cause.setMissingExpressions(colList);
+ cause.setMissingExpressions(newArrayList(exprs));
return cause;
}
- public static CandidateTablePruneCause missingPartitions(Set<String> nonExistingParts) {
+ static CandidateTablePruneCause missingPartitions(Set<String> nonExistingParts) {
CandidateTablePruneCause cause =
new CandidateTablePruneCause(MISSING_PARTITIONS);
cause.setMissingPartitions(nonExistingParts);
return cause;
}
- public static CandidateTablePruneCause incompletePartitions(Map<String, Map<String, Float>> incompleteParts) {
+ static CandidateTablePruneCause incompletePartitions(Map<String, Map<String, Float>> incompleteParts) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(INCOMPLETE_PARTITION);
//incompleteParts may be null when partial data is allowed.
cause.setIncompletePartitions(incompleteParts);
@@ -325,17 +299,13 @@ public class CandidateTablePruneCause {
public static CandidateTablePruneCause noColumnPartOfAJoinPath(final Collection<String> colSet) {
CandidateTablePruneCause cause =
new CandidateTablePruneCause(NO_COLUMN_PART_OF_A_JOIN_PATH);
- cause.setJoinColumns(new ArrayList<String>() {
- {
- addAll(colSet);
- }
- });
+ cause.setJoinColumns(colSet);
return cause;
}
- public static CandidateTablePruneCause missingDefaultAggregate(String... names) {
+ static CandidateTablePruneCause missingDefaultAggregate(String... names) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(MISSING_DEFAULT_AGGREGATE);
- cause.setColumnsMissingDefaultAggregate(Lists.newArrayList(names));
+ cause.setColumnsMissingDefaultAggregate(newArrayList(names));
return cause;
}
@@ -344,8 +314,8 @@ public class CandidateTablePruneCause {
* @param dimStoragePruningCauses
* @return
*/
- public static CandidateTablePruneCause noCandidateStoragesForDimtable(
- Map<String, CandidateTablePruneCode> dimStoragePruningCauses) {
+ static CandidateTablePruneCause noCandidateStoragesForDimtable(
+ Map<String, CandidateTablePruneCode> dimStoragePruningCauses) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_STORAGES);
cause.setDimStoragePruningCauses(new HashMap<String, CandidateTablePruneCode>());
for (Map.Entry<String, CandidateTablePruneCode> entry : dimStoragePruningCauses.entrySet()) {
@@ -361,6 +331,9 @@ public class CandidateTablePruneCause {
* @param missingPartitionColumns
* @return
*/
+ public static CandidateTablePruneCause partitionColumnsMissing(final String... missingPartitionColumns) {
+ return partitionColumnsMissing(Lists.newArrayList(missingPartitionColumns));
+ }
public static CandidateTablePruneCause partitionColumnsMissing(final List<String> missingPartitionColumns) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(PART_COL_DOES_NOT_EXIST);
cause.nonExistantPartCols = missingPartitionColumns;
@@ -372,7 +345,7 @@ public class CandidateTablePruneCause {
* @param updatePeriodRejectionCause
* @return
*/
- public static CandidateTablePruneCause updatePeriodsRejected(
+ static CandidateTablePruneCause updatePeriodsRejected(
final Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause) {
CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_UPDATE_PERIODS);
cause.updatePeriodRejectionCause = updatePeriodRejectionCause;