You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2017/02/20 09:38:53 UTC
[2/2] lens git commit: LENS-1386 : Add support for separate tables
for update periods in one storage
LENS-1386 : Add support for separate tables for update periods in one storage
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/f0dadd79
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/f0dadd79
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/f0dadd79
Branch: refs/heads/master
Commit: f0dadd79bb626fe6f8bbf21569e3062aeb9be070
Parents: 0cd22b1
Author: Lavkesh Lahngir <la...@linux.com>
Authored: Mon Feb 20 15:08:40 2017 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Mon Feb 20 15:08:40 2017 +0530
----------------------------------------------------------------------
lens-api/src/main/resources/cube-0.1.xsd | 28 +-
.../lens/cube/metadata/CubeFactTable.java | 68 +++-
.../lens/cube/metadata/CubeMetastoreClient.java | 339 +++++++++++--------
.../lens/cube/metadata/MetastoreUtil.java | 6 +
.../org/apache/lens/cube/metadata/Storage.java | 30 +-
.../cube/metadata/TestCubeMetastoreClient.java | 151 ++++++++-
.../metastore/CubeMetastoreServiceImpl.java | 182 ++++++----
.../apache/lens/server/metastore/JAXBUtils.java | 66 +++-
.../server/metastore/TestMetastoreService.java | 186 +++++++++-
9 files changed, 811 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-api/src/main/resources/cube-0.1.xsd
----------------------------------------------------------------------
diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd
index f438f48..431d68b 100644
--- a/lens-api/src/main/resources/cube-0.1.xsd
+++ b/lens-api/src/main/resources/cube-0.1.xsd
@@ -681,8 +681,27 @@
</xs:complexType>
<xs:complexType name="x_update_periods">
- <xs:sequence>
+ <xs:annotation>
+ <xs:documentation>
+ A list of update_period which contains either update period table descriptor or list of update_peroid enum.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:choice maxOccurs="1" minOccurs="0">
+ <xs:element name="update_period_table_descriptor" type="x_update_period_table_descriptor" maxOccurs="unbounded"
+ minOccurs="0"/>
<xs:element name="update_period" type="x_update_period" maxOccurs="unbounded" minOccurs="0"/>
+ </xs:choice>
+ </xs:complexType>
+
+ <xs:complexType name="x_update_period_table_descriptor">
+ <xs:annotation>
+ <xs:documentation>
+ An update period descriptor keeps an enum of update period and a storage table descriptor.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:sequence>
+ <xs:element name="update_period" type="x_update_period" maxOccurs="1" minOccurs="1"/>
+ <xs:element name="table_desc" type="x_storage_table_desc" maxOccurs="1" minOccurs="1"/>
</xs:sequence>
</xs:complexType>
@@ -1001,13 +1020,14 @@
<xs:complexType name="x_storage_table_element">
<xs:annotation>
<xs:documentation>
- Storage and storage table description and update periods
+ Storage and storage table description and update periods. table_desc is invalid when update_periods has a list
+ of update_period_table_descriptor instead of a list of enums.
</xs:documentation>
</xs:annotation>
<xs:sequence>
- <xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="0"/>
+ <xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="1"/>
<xs:element name="storage_name" type="xs:string"/>
- <xs:element type="x_storage_table_desc" name="table_desc"/>
+ <xs:element type="x_storage_table_desc" name="table_desc" maxOccurs="1" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
index adb6c92..896a7a1 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java
@@ -29,10 +29,14 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Table;
import com.google.common.collect.Lists;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CubeFactTable extends AbstractCubeTable {
+ @Getter
+ // Map<StorageName, Map<update_period, storage_table_prefix>>
+ private final Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap;
private String cubeName;
private final Map<String, Set<UpdatePeriod>> storageUpdatePeriods;
@@ -40,8 +44,10 @@ public class CubeFactTable extends AbstractCubeTable {
super(hiveTable);
this.storageUpdatePeriods = getUpdatePeriods(getName(), getProperties());
this.cubeName = getCubeName(getName(), getProperties());
+ this.storagePrefixUpdatePeriodMap = getUpdatePeriodMap(getName(), getProperties());
}
+
public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
Map<String, Set<UpdatePeriod>> storageUpdatePeriods) {
this(cubeName, factName, columns, storageUpdatePeriods, 0L, new HashMap<String, String>());
@@ -54,9 +60,18 @@ public class CubeFactTable extends AbstractCubeTable {
public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties) {
+ this(cubeName, factName, columns, storageUpdatePeriods, weight, properties,
+ new HashMap<String, Map<UpdatePeriod, String>>());
+
+ }
+
+ public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
+ Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties,
+ Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap) {
super(factName, columns, properties, weight);
this.cubeName = cubeName;
this.storageUpdatePeriods = storageUpdatePeriods;
+ this.storagePrefixUpdatePeriodMap = storagePrefixUpdatePeriodMap;
addProperties();
}
@@ -65,6 +80,18 @@ public class CubeFactTable extends AbstractCubeTable {
super.addProperties();
addCubeNames(getName(), getProperties(), cubeName);
addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods);
+ addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap);
+ }
+
+ private void addStorageTableProperties(String name, Map<String, String> properties,
+ Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) {
+ for (String storageName : storageUpdatePeriodMap.keySet()) {
+ String prefix = MetastoreUtil.getFactKeyPrefix(name) + "." + storageName;
+ for (Map.Entry updatePeriodEntry : storageUpdatePeriodMap.get(storageName).entrySet()) {
+ String updatePeriod = ((UpdatePeriod) updatePeriodEntry.getKey()).getName();
+ properties.put(prefix + "." + updatePeriod, (String) updatePeriodEntry.getValue());
+ }
+ }
}
private static void addUpdatePeriodProperies(String name, Map<String, String> props,
@@ -82,7 +109,29 @@ public class CubeFactTable extends AbstractCubeTable {
props.put(MetastoreUtil.getFactCubeNameKey(factName), cubeName);
}
- private static Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) {
+ private Map<String, Map<UpdatePeriod, String>> getUpdatePeriodMap(String factName, Map<String, String> props) {
+ Map<String, Map<UpdatePeriod, String>> ret = new HashMap<>();
+ for (Map.Entry entry : storageUpdatePeriods.entrySet()) {
+ String storage = (String) entry.getKey();
+ for (UpdatePeriod period : (Set<UpdatePeriod>) entry.getValue()) {
+ String storagePrefixKey = MetastoreUtil
+ .getUpdatePeriodStoragePrefixKey(factName.trim(), storage, period.getName());
+ String storageTableNamePrefix = props.get(storagePrefixKey);
+ if (storageTableNamePrefix == null) {
+ storageTableNamePrefix = storage;
+ }
+ Map<UpdatePeriod, String> mapOfUpdatePeriods = ret.get(storage);
+ if (mapOfUpdatePeriods == null) {
+ mapOfUpdatePeriods = new HashMap<>();
+ ret.put(storage, mapOfUpdatePeriods);
+ }
+ mapOfUpdatePeriods.put(period, storageTableNamePrefix);
+ }
+ }
+ return ret;
+ }
+
+ private Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) {
Map<String, Set<UpdatePeriod>> storageUpdatePeriods = new HashMap<>();
String storagesStr = props.get(MetastoreUtil.getFactStorageListKey(name));
if (!StringUtils.isBlank(storagesStr)) {
@@ -273,13 +322,16 @@ public class CubeFactTable extends AbstractCubeTable {
/**
* Add a storage with specified update periods
- *
* @param storage
* @param updatePeriods
+ * @param updatePeriodStoragePrefix
*/
- void addStorage(String storage, Set<UpdatePeriod> updatePeriods) {
+ void addStorage(String storage, Set<UpdatePeriod> updatePeriods,
+ Map<UpdatePeriod, String> updatePeriodStoragePrefix) {
storageUpdatePeriods.put(storage, updatePeriods);
+ storagePrefixUpdatePeriodMap.put(storage, updatePeriodStoragePrefix);
addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods);
+ addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap);
}
/**
@@ -289,6 +341,12 @@ public class CubeFactTable extends AbstractCubeTable {
*/
void dropStorage(String storage) {
storageUpdatePeriods.remove(storage);
+ String prefix = MetastoreUtil.getFactKeyPrefix(getName()) + "." + storage;
+ for (Map.Entry updatePeriodEntry : storagePrefixUpdatePeriodMap.get(storage).entrySet()) {
+ String updatePeriod = ((UpdatePeriod)updatePeriodEntry.getKey()).getName();
+ getProperties().remove(prefix + "." + updatePeriod);
+ }
+ storagePrefixUpdatePeriodMap.remove(storage);
getProperties().remove(MetastoreUtil.getFactUpdatePeriodKey(getName(), storage));
String newStorages = StringUtils.join(storageUpdatePeriods.keySet(), ",");
getProperties().put(MetastoreUtil.getFactStorageListKey(getName()), newStorages);
@@ -351,5 +409,7 @@ public class CubeFactTable extends AbstractCubeTable {
return Collections.min(Lists.newArrayList(getRelativeEndTime(), getAbsoluteEndTime()));
}
-
+ public String getTablePrefix(String storage, UpdatePeriod updatePeriod) {
+ return storagePrefixUpdatePeriodMap.get(storage).get(updatePeriod);
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
index 6c9cde2..087c203 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java
@@ -31,7 +31,7 @@ import org.apache.lens.cube.metadata.Storage.LatestInfo;
import org.apache.lens.cube.metadata.Storage.LatestPartColumnInfo;
import org.apache.lens.cube.metadata.timeline.PartitionTimeline;
import org.apache.lens.cube.metadata.timeline.PartitionTimelineFactory;
-import org.apache.lens.server.api.*;
+import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.metastore.DataCompletenessChecker;
import org.apache.lens.server.api.util.LensUtil;
@@ -121,7 +121,13 @@ public class CubeMetastoreClient {
if (ind <= 0) {
throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName());
}
- return storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length());
+ String name = storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length());
+ for (String storageName : fact.getStorages()) {
+ if (name.equalsIgnoreCase(storageName)) {
+ return storageName;
+ }
+ }
+ throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName());
}
/**
@@ -169,11 +175,11 @@ public class CubeMetastoreClient {
UpdatePeriod updatePeriod = updatePeriodStr == null ? null : UpdatePeriod.valueOf(updatePeriodStr.toUpperCase());
List<PartitionTimeline> ret = Lists.newArrayList();
CubeFactTable fact = getCubeFact(factName);
- List<String> keys = Lists.newArrayList();
+ List<String> storageList = Lists.newArrayList();
if (storage != null) {
- keys.add(storage);
+ storageList.add(storage);
} else {
- keys.addAll(fact.getStorages());
+ storageList.addAll(fact.getStorages());
}
String partCol = null;
if (timeDimension != null) {
@@ -186,9 +192,9 @@ public class CubeMetastoreClient {
}
partCol = baseCube.getPartitionColumnOfTimeDim(timeDimension);
}
- for (String key : keys) {
+ for (String storageName : storageList) {
for (Map.Entry<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> entry : partitionTimelineCache
- .get(factName, key).entrySet()) {
+ .get(factName, storageName).entrySet()) {
if (updatePeriod == null || entry.getKey().equals(updatePeriod)) {
for (Map.Entry<String, PartitionTimeline> entry1 : entry.getValue().entrySet()) {
if (partCol == null || partCol.equals(entry1.getKey())) {
@@ -201,25 +207,30 @@ public class CubeMetastoreClient {
return ret;
}
- public void updatePartition(String fact, String storageName, Partition partition)
+ public void updatePartition(String fact, String storageName, Partition partition, UpdatePeriod updatePeriod)
throws HiveException, InvalidOperationException, LensException {
- updatePartitions(fact, storageName, Collections.singletonList(partition));
+ Map<UpdatePeriod, List<Partition>> updatePeriodListMap = new HashMap<>();
+ updatePeriodListMap.put(updatePeriod, Collections.singletonList(partition));
+ updatePartitions(fact, storageName, updatePeriodListMap);
}
- public void updatePartitions(String factOrDimtableName, String storageName, List<Partition> partitions)
- throws HiveException, InvalidOperationException, LensException {
- List<Partition> partitionsToAlter = Lists.newArrayList();
- partitionsToAlter.addAll(partitions);
- partitionsToAlter.addAll(getAllLatestPartsEquivalentTo(factOrDimtableName, storageName, partitions));
- getStorage(storageName).updatePartitions(getClient(), factOrDimtableName, partitionsToAlter);
+ public void updatePartitions(String factOrDimtableName, String storageName,
+ Map<UpdatePeriod, List<Partition>> partitions) throws HiveException, InvalidOperationException, LensException {
+ for (Map.Entry entry : partitions.entrySet()) {
+ List<Partition> partitionsToAlter = Lists.newArrayList();
+ partitionsToAlter.addAll((List<Partition>) entry.getValue());
+ String storageTableName = getStorageTableName(factOrDimtableName, storageName, (UpdatePeriod) entry.getKey());
+ partitionsToAlter.addAll(
+ getAllLatestPartsEquivalentTo(factOrDimtableName, storageTableName, (List<Partition>) entry.getValue()));
+ getStorage(storageName).updatePartitions(storageTableName, getClient(), factOrDimtableName, partitionsToAlter);
+ }
}
- private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageName,
+ private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageTableName,
List<Partition> partitions) throws HiveException, LensException {
if (isFactTable(factOrDimtableName)) {
return Lists.newArrayList();
}
- String storageTableName = getFactOrDimtableStorageTableName(factOrDimtableName, storageName);
Table storageTable = getTable(storageTableName);
List<String> timePartCols = getTimePartColNamesOfTable(storageTable);
List<Partition> latestParts = Lists.newArrayList();
@@ -279,6 +290,17 @@ public class CubeMetastoreClient {
}
}
+ public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
+ Map<String, Set<UpdatePeriod>> storageAggregatePeriods, double weight, Map<String, String> properties,
+ Map<String, StorageTableDesc> storageTableDescs, Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap)
+ throws LensException {
+ CubeFactTable factTable = new CubeFactTable(cubeName, factName, columns, storageAggregatePeriods, weight,
+ properties, storageUpdatePeriodMap);
+ createCubeTable(factTable, storageTableDescs);
+ // do a get to update cache
+ getCubeFact(factName);
+
+ }
/**
* In-memory storage of {@link PartitionTimeline} objects for each valid
@@ -327,48 +349,75 @@ public class CubeMetastoreClient {
public TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> get(String fact, String storage)
throws HiveException, LensException {
// SUSPEND CHECKSTYLE CHECK DoubleCheckedLockingCheck
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
- if (get(storageTableName) == null) {
- synchronized (this) {
- if (get(storageTableName) == null) {
- Table storageTable = getTable(storageTableName);
- if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) {
- try {
- loadTimelinesFromTableProperties(fact, storage);
- } catch (Exception e) {
- // Ideally this should never come. But since we have another source,
- // let's piggyback on that for loading timeline
- log.error("Error while loading timelines from table properties.", e);
- loadTimelinesFromAllPartitions(fact, storage);
- }
- } else {
- loadTimelinesFromAllPartitions(fact, storage);
+ // Unique key for the timeline cache, based on storageName and fact.
+ String timeLineKey = (Storage.getPrefix(storage)+ fact).toLowerCase();
+ synchronized (this) {
+ if (get(timeLineKey) == null) {
+ loadTimeLines(fact, storage, timeLineKey);
+ }
+ log.info("timeline for {} is: {}", storage, get(timeLineKey));
+ // return the final value from memory
+ return get(timeLineKey);
+ // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck
+ }
+ }
+
+ /**
+ * @param fact
+ * @param storage
+ */
+ private void loadTimeLines(String fact, String storage, String timeLineKey) throws LensException, HiveException {
+ Set<String> uniqueStorageTables = new HashSet<>();
+ Map<UpdatePeriod, String> updatePeriodTableName = new HashMap<>();
+ for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
+ String storageTableName = getStorageTableName(fact, storage, updatePeriod);
+ updatePeriodTableName.put(updatePeriod, storageTableName);
+ Table storageTable = getTable(storageTableName);
+ if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) {
+ try {
+ loadTimelinesFromTableProperties(updatePeriod, storageTableName, timeLineKey);
+ } catch (Exception e) {
+ // Ideally this should never come. But since we have another source,
+ // let's piggyback on that for loading timeline
+ log.error("Error while loading timelines from table properties.", e);
+ ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey);
+ if (!uniqueStorageTables.contains(storageTableName)) {
+ uniqueStorageTables.add(storageTableName);
+ loadTimelinesFromAllPartitions(storageTableName, timeLineKey);
}
}
+ } else {
+ ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey);
+ if (!uniqueStorageTables.contains(storageTableName)) {
+ uniqueStorageTables.add(storageTableName);
+ loadTimelinesFromAllPartitions(storageTableName, timeLineKey);
+ }
}
- log.info("timeline for {} is: {}", storageTableName, get(storageTableName));
}
- // return the final value from memory
- return get(storageTableName);
- // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck
+ for (Map.Entry entry : updatePeriodTableName.entrySet()) {
+ alterTablePartitionCache(timeLineKey, (UpdatePeriod) entry.getKey(), (String) entry.getValue());
+ }
}
- private void loadTimelinesFromAllPartitions(String fact, String storage) throws HiveException, LensException {
+ private void ensureEntryForTimeLineKey(String fact, String storage, UpdatePeriod updatePeriod,
+ String storageTableName, String timeLineKey) throws LensException {
// Not found in table properties either, compute from all partitions of the fact-storage table.
// First make sure all combinations of update period and partition column have an entry even
// if no partitions exist
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
- log.info("loading from all partitions: {}", storageTableName);
- Table storageTable = getTable(storageTableName);
- if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(
- storage) != null) {
- for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
- for (String partCol : getTimePartColNamesOfTable(storageTable)) {
- ensureEntry(storageTableName, updatePeriod, partCol);
- }
+ if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(storage) != null) {
+ log.info("loading from all partitions: {}", storageTableName);
+ Table storageTable = getTable(storageTableName);
+ for (String partCol : getTimePartColNamesOfTable(storageTable)) {
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol);
}
}
+
+ }
+
+ private void loadTimelinesFromAllPartitions(String storageTableName, String timeLineKey)
+ throws HiveException, LensException {
// Then add all existing partitions for batch addition in respective timelines.
+ Table storageTable = getTable(storageTableName);
List<String> timeParts = getTimePartColNamesOfTable(storageTable);
List<FieldSchema> partCols = storageTable.getPartCols();
for (Partition partition : getPartitionsByFilter(storageTableName, null)) {
@@ -382,23 +431,17 @@ public class CubeMetastoreClient {
}
for (int i = 0; i < partCols.size(); i++) {
if (timeParts.contains(partCols.get(i).getName())) {
- addForBatchAddition(storageTableName, period, partCols.get(i).getName(), values.get(i));
+ addForBatchAddition(timeLineKey, storageTableName, period, partCols.get(i).getName(), values.get(i));
}
}
}
- // commit all batch addition for the storage table,
- // which will in-turn commit all batch additions in all it's timelines.
- commitAllBatchAdditions(storageTableName);
}
- private void loadTimelinesFromTableProperties(String fact, String storage) throws HiveException, LensException {
- // found in table properties, load from there.
- String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage));
+ private void loadTimelinesFromTableProperties(UpdatePeriod updatePeriod,
+ String storageTableName, String timeLineKey) throws HiveException, LensException {
log.info("loading from table properties: {}", storageTableName);
- for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) {
- for (String partCol : getTimePartColNamesOfTable(storageTableName)) {
- ensureEntry(storageTableName, updatePeriod, partCol).init(getTable(storageTableName));
- }
+ for (String partCol : getTimePartColNamesOfTable(storageTableName)) {
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol).init(getTable(storageTableName));
}
}
@@ -406,16 +449,17 @@ public class CubeMetastoreClient {
* Adds given partition(for storageTable, updatePeriod, partitionColum=partition) for batch addition in an
* appropriate timeline object. Ignore if partition is not valid.
*
- * @param storageTable storage table
+ * @param timeLineKey key for the timeLine map
+ * @param storageTableName hive table name
* @param updatePeriod update period
* @param partitionColumn partition column
* @param partition partition
*/
- public void addForBatchAddition(String storageTable, UpdatePeriod updatePeriod, String partitionColumn,
- String partition) {
+ public void addForBatchAddition(String timeLineKey, String storageTableName, UpdatePeriod updatePeriod,
+ String partitionColumn, String partition) {
try {
- ensureEntry(storageTable, updatePeriod, partitionColumn).addForBatchAddition(TimePartition.of(updatePeriod,
- partition));
+ ensureEntry(timeLineKey, storageTableName, updatePeriod, partitionColumn)
+ .addForBatchAddition(TimePartition.of(updatePeriod, partition));
} catch (LensException e) {
// to take care of the case where partition name is something like `latest`
log.error("Couldn't parse partition: {} with update period: {}, skipping.", partition, updatePeriod, e);
@@ -427,42 +471,24 @@ public class CubeMetastoreClient {
* <p></p>
* kind of like mkdir -p
*
- * @param storageTable storage table
+ * @param timeLineKey storage table
* @param updatePeriod update period
* @param partitionColumn partition column
* @return timeline if already exists, or puts a new timeline and returns.
*/
- public PartitionTimeline ensureEntry(String storageTable, UpdatePeriod updatePeriod, String partitionColumn) {
- if (get(storageTable) == null) {
- put(storageTable, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>());
+ public PartitionTimeline ensureEntry(String timeLineKey, String storagTableName, UpdatePeriod updatePeriod,
+ String partitionColumn) {
+ if (get(timeLineKey) == null) {
+ put(timeLineKey, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>());
}
- if (get(storageTable).get(updatePeriod) == null) {
- get(storageTable).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>());
+ if (get(timeLineKey).get(updatePeriod) == null) {
+ get(timeLineKey).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>());
}
- if (get(storageTable).get(updatePeriod).get(partitionColumn) == null) {
- get(storageTable).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get(
- CubeMetastoreClient.this, storageTable, updatePeriod, partitionColumn));
- }
- return get(storageTable).get(updatePeriod).get(partitionColumn);
- }
-
- /**
- * commit all batch addition for all its timelines.
- *
- * @param storageTable storage table
- * @throws HiveException
- * @throws LensException
- */
- public void commitAllBatchAdditions(String storageTable) throws HiveException, LensException {
- if (get(storageTable) != null) {
- for (UpdatePeriod updatePeriod : get(storageTable).keySet()) {
- for (String partCol : get(storageTable).get(updatePeriod).keySet()) {
- PartitionTimeline timeline = get(storageTable).get(updatePeriod).get(partCol);
- timeline.commitBatchAdditions();
- }
- }
- alterTablePartitionCache(storageTable);
+ if (get(timeLineKey).get(updatePeriod).get(partitionColumn) == null) {
+ get(timeLineKey).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get(
+ CubeMetastoreClient.this, storagTableName, updatePeriod, partitionColumn));
}
+ return get(timeLineKey).get(updatePeriod).get(partitionColumn);
}
/** check partition existence in the appropriate timeline if it exists */
@@ -478,9 +504,11 @@ public class CubeMetastoreClient {
*/
public PartitionTimeline get(String fact, String storage, UpdatePeriod updatePeriod, String partCol)
throws HiveException, LensException {
- return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null && get(fact, storage).get(
- updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod).get(partCol) : null;
+ return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null
+ && get(fact, storage).get(updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod)
+ .get(partCol) : null;
}
+
/**
* returns the timeline corresponding to fact-storage table, updatePeriod, partCol. throws exception if not
* exists, which would most probably mean the combination is incorrect.
@@ -489,8 +517,8 @@ public class CubeMetastoreClient {
throws HiveException, LensException {
PartitionTimeline timeline = get(fact, storage, updatePeriod, partCol);
if (timeline == null) {
- throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(),
- fact, storage, updatePeriod, partCol);
+ throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), fact, storage, updatePeriod,
+ partCol);
}
return timeline;
}
@@ -519,8 +547,8 @@ public class CubeMetastoreClient {
boolean updated = false;
for (Map.Entry<String, Date> entry : timePartSpec.entrySet()) {
TimePartition part = TimePartition.of(updatePeriod, entry.getValue());
- if (!partitionExistsByFilter(cubeTableName, storageName, StorageConstants.getPartFilter(entry.getKey(),
- part.getDateString()))) {
+ if (!partitionExistsByFilter(cubeTableName, storageName, updatePeriod,
+ StorageConstants.getPartFilter(entry.getKey(), part.getDateString()))) {
get(cubeTableName, storageName, updatePeriod, entry.getKey()).drop(part);
updated = true;
}
@@ -565,10 +593,10 @@ public class CubeMetastoreClient {
Hive.closeCurrent();
}
- private void createOrAlterStorageHiveTable(Table parent, String storage, StorageTableDesc crtTblDesc)
+ private void createOrAlterStorageHiveTable(Table parent, String storageTableNamePrefix, StorageTableDesc crtTblDesc)
throws LensException {
try {
- Table tbl = getStorage(storage).getStorageTable(getClient(), parent, crtTblDesc);
+ Table tbl = Storage.getStorageTable(storageTableNamePrefix, getClient(), parent, crtTblDesc);
if (tableExists(tbl.getTableName())) {
// alter table
alterHiveTable(tbl.getTableName(), tbl);
@@ -730,7 +758,7 @@ public class CubeMetastoreClient {
* @param storageAggregatePeriods Aggregate periods for the storages
* @param weight Weight of the cube
* @param properties Properties of fact table
- * @param storageTableDescs Map of storage to its storage table description
+ * @param storageTableDescs Map of storage table prefix to its storage table description
* @throws LensException
*/
public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns,
@@ -808,7 +836,7 @@ public class CubeMetastoreClient {
* Create cube table defined and create all the corresponding storage tables
*
* @param cubeTable Can be fact or dimension table
- * @param storageTableDescs Map of storage to its storage table description
+ * @param storageTableDescs Map of storage tableName prefix to its storage table description
* @throws LensException
*/
public void createCubeTable(AbstractCubeTable cubeTable, Map<String, StorageTableDesc> storageTableDescs)
@@ -836,14 +864,17 @@ public class CubeMetastoreClient {
* @param fact The CubeFactTable
* @param storage The storage
* @param updatePeriods Update periods of the fact on the storage
- * @param storageTableDesc The storage table description
+ * @param storageTableDescs The storage table description
* @throws LensException
*/
public void addStorage(CubeFactTable fact, String storage, Set<UpdatePeriod> updatePeriods,
- StorageTableDesc storageTableDesc) throws LensException {
- fact.addStorage(storage, updatePeriods);
- createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT),
- storage, storageTableDesc);
+ Map<String, StorageTableDesc> storageTableDescs, Map<UpdatePeriod, String> updatePeriodStoragePrefix)
+ throws LensException {
+ fact.addStorage(storage, updatePeriods, updatePeriodStoragePrefix);
+ for (Map.Entry entry : storageTableDescs.entrySet()) {
+ createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT),
+ (String) entry.getKey(), (StorageTableDesc) entry.getValue());
+ }
alterCubeTable(fact.getName(), getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), fact);
updateFactCache(fact.getName());
}
@@ -860,8 +891,8 @@ public class CubeMetastoreClient {
public void addStorage(CubeDimensionTable dim, String storage, UpdatePeriod dumpPeriod,
StorageTableDesc storageTableDesc) throws LensException {
dim.alterSnapshotDumpPeriod(storage, dumpPeriod);
- createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE),
- storage, storageTableDesc);
+ createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), storage,
+ storageTableDesc);
alterCubeTable(dim.getName(), getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), dim);
updateDimCache(dim.getName());
}
@@ -896,10 +927,19 @@ public class CubeMetastoreClient {
return partsAdded;
}
+ /**
+ * @param factOrDimTable
+ * @param storageName
+ * @param updatePeriod
+ * @param storagePartitionDescs
+ * @param type
+ * @return
+ * @throws HiveException
+ * @throws LensException
+ */
private List<Partition> addPartitions(String factOrDimTable, String storageName, UpdatePeriod updatePeriod,
List<StoragePartitionDesc> storagePartitionDescs, CubeTableType type) throws HiveException, LensException {
- String storageTableName = getStorageTableName(factOrDimTable.trim(),
- Storage.getPrefix(storageName.trim())).toLowerCase();
+ String storageTableName = getStorageTableName(factOrDimTable, storageName, updatePeriod);
if (type == CubeTableType.DIM_TABLE) {
// Adding partition in dimension table.
Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap();
@@ -910,7 +950,7 @@ public class CubeMetastoreClient {
}
List<Partition> partsAdded =
getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, storagePartitionDescs,
- latestInfos);
+ latestInfos, storageTableName);
ListIterator<Partition> iter = partsAdded.listIterator();
while (iter.hasNext()) {
if (iter.next().getSpec().values().contains(StorageConstants.LATEST_PARTITION_VALUE)) {
@@ -928,10 +968,11 @@ public class CubeMetastoreClient {
// Adding partition in fact table.
if (storagePartitionDescs.size() > 0) {
partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod,
- storagePartitionDescs, null);
+ storagePartitionDescs, null, storageTableName);
}
// update hive table
- alterTablePartitionCache(getStorageTableName(factOrDimTable, Storage.getPrefix(storageName)));
+ alterTablePartitionCache((Storage.getPrefix(storageName) + factOrDimTable).toLowerCase(), updatePeriod,
+ storageTableName);
return partsAdded;
} else {
throw new LensException("Can't add partitions to anything other than fact or dimtable");
@@ -1018,20 +1059,20 @@ public class CubeMetastoreClient {
}
/**
- * store back all timelines of given storage table to table properties
+ * store back all timelines of given storage to table properties
*
- * @param storageTableName storage table name
+ * @param timeLineKey key for the time line
+ * @param storageTableName Storage table name
* @throws HiveException
*/
- private void alterTablePartitionCache(String storageTableName) throws HiveException, LensException {
+ private void alterTablePartitionCache(String timeLineKey, UpdatePeriod updatePeriod, String storageTableName)
+ throws HiveException, LensException {
Table table = getTable(storageTableName);
Map<String, String> params = table.getParameters();
- if (partitionTimelineCache.get(storageTableName) != null) {
- for (UpdatePeriod updatePeriod : partitionTimelineCache.get(storageTableName).keySet()) {
- for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(storageTableName)
- .get(updatePeriod).entrySet()) {
- entry.getValue().updateTableParams(table);
- }
+ if (partitionTimelineCache.get(timeLineKey) != null) {
+ for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(timeLineKey).get(updatePeriod)
+ .entrySet()) {
+ entry.getValue().updateTableParams(table);
}
params.put(getPartitionTimelineCachePresenceKey(), "true");
alterHiveTable(storageTableName, table);
@@ -1173,8 +1214,7 @@ public class CubeMetastoreClient {
*/
public void dropPartition(String cubeTableName, String storageName, Map<String, Date> timePartSpec,
Map<String, String> nonTimePartSpec, UpdatePeriod updatePeriod) throws HiveException, LensException {
- String storageTableName = getStorageTableName(cubeTableName.trim(),
- Storage.getPrefix(storageName.trim())).toLowerCase();
+ String storageTableName = getStorageTableName(cubeTableName.trim(), storageName, updatePeriod);
Table hiveTable = getHiveTable(storageTableName);
List<FieldSchema> partCols = hiveTable.getPartCols();
List<String> partColNames = new ArrayList<>(partCols.size());
@@ -1244,7 +1284,8 @@ public class CubeMetastoreClient {
// dropping fact partition
getStorage(storageName).dropPartition(getClient(), storageTableName, partVals, null, null);
if (partitionTimelineCache.updateForDeletion(cubeTableName, storageName, updatePeriod, timePartSpec)) {
- this.alterTablePartitionCache(storageTableName);
+ this.alterTablePartitionCache((Storage.getPrefix(storageName) + cubeTableName).toLowerCase(), updatePeriod,
+ storageTableName);
}
}
}
@@ -1277,7 +1318,7 @@ public class CubeMetastoreClient {
public boolean factPartitionExists(String factName, String storageName, UpdatePeriod updatePeriod,
Map<String, Date> partitionTimestamp,
Map<String, String> partSpec) throws HiveException, LensException {
- String storageTableName = getFactOrDimtableStorageTableName(factName, storageName);
+ String storageTableName = getStorageTableName(factName, storageName, updatePeriod);
return partitionExists(storageTableName, updatePeriod, partitionTimestamp, partSpec);
}
@@ -1286,9 +1327,9 @@ public class CubeMetastoreClient {
return partitionExists(storageTableName, getPartitionSpec(updatePeriod, partitionTimestamps));
}
- public boolean partitionExistsByFilter(String cubeTableName, String storageName, String filter) throws LensException {
- return partitionExistsByFilter(getStorageTableName(cubeTableName, Storage.getPrefix(storageName)),
- filter);
+ public boolean partitionExistsByFilter(String cubeTableName, String storageName, UpdatePeriod updatePeriod,
+ String filter) throws LensException {
+ return partitionExistsByFilter(getStorageTableName(cubeTableName, storageName, updatePeriod), filter);
}
public boolean partitionExistsByFilter(String storageTableName, String filter) throws LensException {
@@ -1354,7 +1395,7 @@ public class CubeMetastoreClient {
boolean latestPartitionExists(String factOrDimTblName, String storageName, String latestPartCol)
throws HiveException, LensException {
- String storageTableName = getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName));
+ String storageTableName = MetastoreUtil.getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName));
if (isDimensionTable(factOrDimTblName)) {
return dimTableLatestPartitionExists(storageTableName);
} else {
@@ -2225,18 +2266,30 @@ public class CubeMetastoreClient {
*/
public void dropStorageFromFact(String factName, String storage) throws LensException {
CubeFactTable cft = getFactTable(factName);
+ dropHiveTablesForStorage(factName, storage);
cft.dropStorage(storage);
- dropHiveTable(getFactOrDimtableStorageTableName(factName, storage));
alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft);
updateFactCache(factName);
}
+ private void dropHiveTablesForStorage(String factName, String storage) throws LensException{
+ CubeFactTable cft = getFactTable(factName);
+ Set<String> droppedTables = new HashSet<>();
+ for (Map.Entry updatePeriodEntry : cft.getStoragePrefixUpdatePeriodMap().get(storage).entrySet()) {
+ UpdatePeriod updatePeriod = (UpdatePeriod) updatePeriodEntry.getKey();
+ String storageTableName = getStorageTableName(factName, storage, updatePeriod);
+ if (!droppedTables.contains(storageTableName)) {
+ dropHiveTable(storageTableName);
+ }
+ droppedTables.add(storageTableName);
+ }
+ }
// updateFact will be false when fact is fully dropped
private void dropStorageFromFact(String factName, String storage, boolean updateFact)
throws LensException {
- CubeFactTable cft = getFactTable(factName);
- dropHiveTable(getFactOrDimtableStorageTableName(factName, storage));
+ dropHiveTablesForStorage(factName, storage);
if (updateFact) {
+ CubeFactTable cft = getFactTable(factName);
cft.dropStorage(storage);
alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft);
updateFactCache(factName);
@@ -2432,4 +2485,22 @@ public class CubeMetastoreClient {
Date now = new Date();
return isStorageTableCandidateForRange(storageTableName, resolveDate(fromDate, now), resolveDate(toDate, now));
}
+
+ private String getStorageTablePrefixFromStorage(String factOrDimTableName, String storage, UpdatePeriod updatePeriod)
+ throws LensException {
+ if (updatePeriod == null) {
+ return storage;
+ }
+ if (isFactTable(factOrDimTableName)) {
+ return getFactTable(factOrDimTableName).getTablePrefix(storage, updatePeriod);
+ } else {
+ return storage;
+ }
+ }
+
+ public String getStorageTableName(String factOrDimTableName, String storage, UpdatePeriod updatePeriod)
+ throws LensException {
+ return MetastoreUtil.getFactOrDimtableStorageTableName(factOrDimTableName,
+ getStorageTablePrefixFromStorage(factOrDimTableName, storage, updatePeriod));
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
index 53cf8af..57d4502 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java
@@ -590,4 +590,10 @@ public class MetastoreUtil {
}
return copy;
}
+
+ public static String getUpdatePeriodStoragePrefixKey(String factTableName , String storageName, String updatePeriod) {
+ return MetastoreUtil.getFactKeyPrefix(factTableName) + "." + storageName + "." + updatePeriod;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
index cd9f705..936add4 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java
@@ -124,14 +124,18 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
/**
* Get the storage table descriptor for the given parent table.
*
+ * @param storageTableNamePrefix Storage table prefix based on update period
* @param client The metastore client
* @param parent Is either Fact or Dimension table
* @param crtTbl Create table info
* @return Table describing the storage table
* @throws HiveException
*/
- public Table getStorageTable(Hive client, Table parent, StorageTableDesc crtTbl) throws HiveException {
- String storageTableName = MetastoreUtil.getStorageTableName(parent.getTableName(), this.getPrefix());
+ public static Table getStorageTable(String storageTableNamePrefix, Hive client, Table parent, StorageTableDesc crtTbl)
+ throws HiveException {
+ // Change it to the appropriate storage table name.
+ String storageTableName = MetastoreUtil
+ .getStorageTableName(parent.getTableName(), Storage.getPrefix(storageTableNamePrefix));
Table tbl = client.getTable(storageTableName, false);
if (tbl == null) {
tbl = client.newTable(storageTableName);
@@ -235,21 +239,6 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
}
/**
- * Add single partition to storage. Just calls #addPartitions.
- * @param client
- * @param addPartitionDesc
- * @param latestInfo
- * @throws HiveException
- */
- public List<Partition> addPartition(Hive client, StoragePartitionDesc addPartitionDesc, LatestInfo latestInfo)
- throws HiveException {
- Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap();
- latestInfos.put(addPartitionDesc.getNonTimePartSpec(), latestInfo);
- return addPartitions(client, addPartitionDesc.getCubeTableName(), addPartitionDesc.getUpdatePeriod(),
- Collections.singletonList(addPartitionDesc), latestInfos);
- }
-
- /**
* Add given partitions in the underlying hive table and update latest partition links
*
* @param client hive client instance
@@ -262,12 +251,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
*/
public List<Partition> addPartitions(Hive client, String factOrDimTable, UpdatePeriod updatePeriod,
List<StoragePartitionDesc> storagePartitionDescs,
- Map<Map<String, String>, LatestInfo> latestInfos) throws HiveException {
+ Map<Map<String, String>, LatestInfo> latestInfos, String tableName) throws HiveException {
preAddPartitions(storagePartitionDescs);
Map<Map<String, String>, Map<String, Integer>> latestPartIndexForPartCols = Maps.newHashMap();
boolean success = false;
try {
- String tableName = MetastoreUtil.getStorageTableName(factOrDimTable, this.getPrefix());
String dbName = SessionState.get().getCurrentDatabase();
AddPartitionDesc addParts = new AddPartitionDesc(dbName, tableName, true);
Table storageTbl = client.getTable(dbName, tableName);
@@ -383,11 +371,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta
* @throws InvalidOperationException
* @throws HiveException
*/
- public void updatePartitions(Hive client, String fact, List<Partition> partitions)
+ public void updatePartitions(String storageTable, Hive client, String fact, List<Partition> partitions)
throws InvalidOperationException, HiveException {
boolean success = false;
try {
- client.alterPartitions(MetastoreUtil.getFactOrDimtableStorageTableName(fact, getName()), partitions, null);
+ client.alterPartitions(storageTable, partitions, null);
success = true;
} finally {
if (success) {
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java b/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java
index e21dc2a..950534c 100644
--- a/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java
+++ b/lens-cube/src/test/java/org/apache/lens/cube/metadata/TestCubeMetastoreClient.java
@@ -28,6 +28,7 @@ import static org.apache.lens.server.api.util.LensUtil.getHashMap;
import static org.testng.Assert.*;
import java.text.SimpleDateFormat;
+
import java.util.*;
import org.apache.lens.cube.error.LensCubeErrorCode;
@@ -45,7 +46,10 @@ import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.metadata.*;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
@@ -965,6 +969,132 @@ public class TestCubeMetastoreClient {
assertTrue(client.getAllFacts(altered).isEmpty());
}
+ @Test(priority = 1)
+ public void testUpdatePeriodTableDescriptions() throws LensException, HiveException {
+ List<FieldSchema> factColumns = new ArrayList<>(cubeMeasures.size());
+ String factName = "testFactWithUpdatePeriodTableDescriptions";
+
+ for (CubeMeasure measure : cubeMeasures) {
+ factColumns.add(measure.getColumn());
+ }
+ // add one dimension of the cube
+ factColumns.add(new FieldSchema("zipcode", "int", "zip"));
+ FieldSchema itPart = new FieldSchema("it", "string", "date part");
+ FieldSchema etPart = new FieldSchema("et", "string", "date part");
+ String[] partColNames = new String[] { getDatePartitionKey(), itPart.getName(), etPart.getName() };
+
+ StorageTableDesc s1 = new StorageTableDesc(TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class,
+ Lists.newArrayList(getDatePartition(), itPart, etPart),
+ Lists.newArrayList(getDatePartitionKey(), itPart.getName(), etPart.getName()));
+ StorageTableDesc s2 = new StorageTableDesc(TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class,
+ Lists.newArrayList(getDatePartition(), itPart, etPart),
+ Lists.newArrayList(getDatePartitionKey(), itPart.getName(), etPart.getName()));
+
+ Map<String, Set<UpdatePeriod>> updatePeriods = getHashMap(c1, hourlyAndDaily, c2, hourlyAndDaily);
+ Map<String, StorageTableDesc> storageTables = getHashMap(HOURLY + "_" + c1, s1, DAILY + "_" + c1, s2, c2, s2);
+ Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap = getHashMap(c1,
+ getHashMap(HOURLY, HOURLY + "_" + c1, DAILY, DAILY + "_" + c1), c2, getHashMap(HOURLY, c2, DAILY, c2));
+
+ CubeFactTable cubeFact = new CubeFactTable(CUBE_NAME, factName, factColumns, updatePeriods, 0L, null,
+ storageUpdatePeriodMap);
+ client.createCubeFactTable(CUBE_NAME, factName, factColumns, updatePeriods, 0L, null, storageTables,
+ storageUpdatePeriodMap);
+
+ assertTrue(client.tableExists(factName));
+ Table cubeTbl = client.getHiveTable(factName);
+ assertTrue(client.isFactTable(cubeTbl));
+ assertTrue(client.isFactTableForCube(cubeTbl, CUBE_NAME));
+
+ // Assert for storage tables
+ for (String entry : storageTables.keySet()) {
+ String storageTableName = getFactOrDimtableStorageTableName(factName, entry);
+ assertTrue(client.tableExists(storageTableName));
+ }
+
+ String c1TableNameHourly = getFactOrDimtableStorageTableName(cubeFact.getName(), HOURLY + "_" + c1);
+ String c2TableNameHourly = getFactOrDimtableStorageTableName(cubeFact.getName(), c2);
+
+ Table c1TableHourly = client.getHiveTable(c1TableNameHourly);
+ c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, getDatePartitionKey()),
+ StoreAllPartitionTimeline.class.getCanonicalName());
+ c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, itPart.getName()),
+ StoreAllPartitionTimeline.class.getCanonicalName());
+ c1TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, etPart.getName()),
+ StoreAllPartitionTimeline.class.getCanonicalName());
+ client.pushHiveTable(c1TableHourly);
+
+ Table c2TableHourly = client.getHiveTable(c2TableNameHourly);
+ c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, getDatePartitionKey()),
+ EndsAndHolesPartitionTimeline.class.getCanonicalName());
+ c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, itPart.getName()),
+ EndsAndHolesPartitionTimeline.class.getCanonicalName());
+ c2TableHourly.getParameters().put(getPartitionTimelineStorageClassKey(HOURLY, etPart.getName()),
+ EndsAndHolesPartitionTimeline.class.getCanonicalName());
+ client.pushHiveTable(c2TableHourly);
+
+ assertSameTimelines(factName, new String[] { c1, c2 }, HOURLY, partColNames);
+
+ StoreAllPartitionTimeline timelineDtC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c1, HOURLY, getDatePartitionKey()));
+ StoreAllPartitionTimeline timelineItC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c1, HOURLY, itPart.getName()));
+ StoreAllPartitionTimeline timelineEtC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c1, HOURLY, etPart.getName()));
+ EndsAndHolesPartitionTimeline timelineDt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c2, HOURLY, getDatePartitionKey()));
+ EndsAndHolesPartitionTimeline timelineIt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c2, HOURLY, itPart.getName()));
+ EndsAndHolesPartitionTimeline timelineEt = ((EndsAndHolesPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c2, HOURLY, etPart.getName()));
+
+ StoreAllPartitionTimeline timelineC1 = ((StoreAllPartitionTimeline) client.partitionTimelineCache
+ .get(factName, c1, HOURLY, getDatePartitionKey()));
+
+ Map<String, Date> timeParts1 = getTimePartitionByOffsets(getDatePartitionKey(), 0, itPart.getName(), 0,
+ etPart.getName(), 0);
+ StoragePartitionDesc partSpec1 = new StoragePartitionDesc(cubeFact.getName(), timeParts1, null, HOURLY);
+
+ Map<String, Date> timeParts2 = getTimePartitionByOffsets(getDatePartitionKey(), 0, etPart.getName(), 1);
+ Map<String, String> nonTimeSpec = getHashMap(itPart.getName(), "default");
+ final StoragePartitionDesc partSpec2 = new StoragePartitionDesc(cubeFact.getName(), timeParts2, nonTimeSpec,
+ HOURLY);
+
+ Map<String, Date> timeParts3 = getTimePartitionByOffsets(getDatePartitionKey(), 0, etPart.getName(), 0);
+ final StoragePartitionDesc partSpec3 = new StoragePartitionDesc(cubeFact.getName(), timeParts3, nonTimeSpec,
+ HOURLY);
+
+ client.addPartitions(Arrays.asList(partSpec1, partSpec2, partSpec3), c1, CubeTableType.FACT);
+ client.addPartitions(Arrays.asList(partSpec1, partSpec2, partSpec3), c2, CubeTableType.FACT);
+ PartitionTimeline timeline1Temp = client.partitionTimelineCache.get(factName, c1, HOURLY, getDatePartitionKey());
+ PartitionTimeline timeline2Temp = client.partitionTimelineCache.get(factName, c2, HOURLY, getDatePartitionKey());
+
+ assertEquals(timeline1Temp.getClass(), StoreAllPartitionTimeline.class);
+ assertEquals(timeline2Temp.getClass(), EndsAndHolesPartitionTimeline.class);
+
+ assertEquals(client.getAllParts(c1TableNameHourly).size(), 3);
+ assertEquals(client.getAllParts(c2TableNameHourly).size(), 3);
+
+ assertSameTimelines(factName, new String[] { c1, c2 }, HOURLY, partColNames);
+
+ assertTimeline(timelineDt, timelineDtC1, HOURLY, 0, 0);
+ assertTimeline(timelineEt, timelineEtC1, HOURLY, 0, 1);
+ assertTimeline(timelineIt, timelineItC1, HOURLY, 0, 0);
+
+ assertTrue(client.latestPartitionExists(cubeFact.getName(), c1, getDatePartitionKey()));
+ assertTrue(client.latestPartitionExists(cubeFact.getName(), c1, itPart.getName()));
+ assertTrue(client.latestPartitionExists(cubeFact.getName(), c2, etPart.getName()));
+
+ assertNoPartitionNamedLatest(c1TableNameHourly, partColNames);
+ assertNoPartitionNamedLatest(c2TableNameHourly, partColNames);
+
+ client.dropFact(factName, true);
+ assertFalse(client.tableExists(factName));
+ for (String entry : storageTables.keySet()) {
+ String storageTableName = getFactOrDimtableStorageTableName(factName, entry);
+ assertFalse(client.tableExists(storageTableName));
+ }
+ }
+
@Test(priority = 2)
public void testAlterDerivedCube() throws Exception {
String name = "alter_derived_cube";
@@ -1238,7 +1368,10 @@ public class TestCubeMetastoreClient {
s1.setFieldDelim(":");
storageTables.put(c1, s1);
storageTables.put(c4, s1);
- factTable.addStorage(c4, hourlyAndDaily);
+ Map<UpdatePeriod, String> updatePeriodStoragePrefix = new HashMap<>();
+ updatePeriodStoragePrefix.put(HOURLY, c4);
+ updatePeriodStoragePrefix.put(DAILY, c4);
+ factTable.addStorage(c4, hourlyAndDaily, updatePeriodStoragePrefix);
client.alterCubeFactTable(factName, factTable, storageTables, new HashMap<String, String>());
CubeFactTable altered2 = client.getCubeFact(factName);
assertTrue(client.tableExists(c1TableName));
@@ -1261,7 +1394,12 @@ public class TestCubeMetastoreClient {
assertTrue(client.tableExists(c4TableName));
// add storage
- client.addStorage(altered2, c3, hourlyAndDaily, s1);
+ updatePeriodStoragePrefix.clear();
+ updatePeriodStoragePrefix.put(HOURLY, c3);
+ updatePeriodStoragePrefix.put(DAILY, c3);
+ Map<String, StorageTableDesc> storageTableDescMap = new HashMap<>();
+ storageTableDescMap.put(c3, s1);
+ client.addStorage(altered2, c3, hourlyAndDaily, storageTableDescMap, updatePeriodStoragePrefix);
CubeFactTable altered3 = client.getCubeFact(factName);
assertTrue(altered3.getStorages().contains("C3"));
assertTrue(altered3.getUpdatePeriods().get("C3").equals(hourlyAndDaily));
@@ -1517,14 +1655,16 @@ public class TestCubeMetastoreClient {
for (Partition partition : c1Parts) {
partition.setLocation("blah");
partition.setBucketCount(random.nextInt());
- client.updatePartition(factName, c1, partition);
+ client.updatePartition(factName, c1, partition, HOURLY);
}
assertSamePartitions(client.getAllParts(c1TableName), c1Parts);
for (Partition partition : c2Parts) {
partition.setLocation("blah");
partition.setBucketCount(random.nextInt());
}
- client.updatePartitions(factName, c2, c2Parts);
+ Map<UpdatePeriod, List<Partition>> partitionMap = new HashMap<>();
+ partitionMap.put(HOURLY, c2Parts);
+ client.updatePartitions(factName, c2, partitionMap);
assertSamePartitions(client.getAllParts(c2TableName), c2Parts);
assertSameTimelines(factName, storages, HOURLY, partColNames);
@@ -1998,7 +2138,6 @@ public class TestCubeMetastoreClient {
timePartCols);
Map<String, Set<UpdatePeriod>> updatePeriods = getHashMap(c1, updates);
Map<String, StorageTableDesc> storageTables = getHashMap(c1, s1);
-
CubeFactTable cubeFactWithParts = new CubeFactTable(CUBE_NAME, factNameWithPart, factColumns, updatePeriods);
// create cube fact
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
index 8b10d1d..24660e1 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
@@ -238,7 +238,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
JAXBUtils.dumpPeriodsFromStorageTables(xDimTable.getStorageTables());
Map<String, String> properties = JAXBUtils.mapFromXProperties(xDimTable.getProperties());
- Map<String, StorageTableDesc> storageDesc = JAXBUtils.storageTableMapFromXStorageTables(
+ Map<String, StorageTableDesc> storageDesc = JAXBUtils.tableDescPrefixMapFromXStorageTables(
xDimTable.getStorageTables());
try (SessionContext ignored = new SessionContext(sessionid)){
@@ -289,7 +289,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
try (SessionContext ignored = new SessionContext(sessionid)){
getClient(sessionid).alterCubeDimensionTable(dimensionTable.getTableName(),
JAXBUtils.cubeDimTableFromDimTable(dimensionTable),
- JAXBUtils.storageTableMapFromXStorageTables(dimensionTable.getStorageTables()));
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(dimensionTable.getStorageTables()));
log.info("Updated dimension table " + dimensionTable.getTableName());
} catch (HiveException exc) {
throw new LensException(exc);
@@ -398,15 +398,38 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
CubeMetastoreClient msClient = getClient(sessionid);
CubeFactTable cft = msClient.getFactTable(fact);
XFactTable factTable = JAXBUtils.factTableFromCubeFactTable(cft);
+ Map<String, Map<UpdatePeriod, String>> storageMap = cft.getStoragePrefixUpdatePeriodMap();
for (String storageName : cft.getStorages()) {
Set<UpdatePeriod> updatePeriods = cft.getUpdatePeriods().get(storageName);
- XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable(
- msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName)));
- tblElement.setStorageName(storageName);
- for (UpdatePeriod p : updatePeriods) {
- tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name()));
+ // This map tells if there are different tables for different update period.
+ Map<UpdatePeriod, String> updatePeriodToTableMap = storageMap.get(storageName);
+ Set<String> tableNames = new HashSet<>();
+ for (UpdatePeriod updatePeriod : updatePeriods) {
+ tableNames.add(updatePeriodToTableMap.get(updatePeriod));
+ }
+ if (tableNames.size() <= 1) {
+ XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable(
+ msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName)));
+ tblElement.setStorageName(storageName);
+ for (UpdatePeriod p : updatePeriods) {
+ tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name()));
+ }
+ factTable.getStorageTables().getStorageTable().add(tblElement);
+ } else {
+ // Multiple storage tables.
+ XStorageTableElement tblElement = new XStorageTableElement();
+ tblElement.setStorageName(storageName);
+ XUpdatePeriods xUpdatePeriods = new XUpdatePeriods();
+ tblElement.setUpdatePeriods(xUpdatePeriods);
+ for (Map.Entry entry : updatePeriodToTableMap.entrySet()) {
+ XUpdatePeriodTableDescriptor updatePeriodTableDescriptor = new XUpdatePeriodTableDescriptor();
+ updatePeriodTableDescriptor.setTableDesc(getStorageTableDescFromHiveTable(
+ msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, (String) entry.getValue()))));
+ updatePeriodTableDescriptor.setUpdatePeriod(XUpdatePeriod.valueOf(((UpdatePeriod)entry.getKey()).name()));
+ xUpdatePeriods.getUpdatePeriodTableDescriptor().add(updatePeriodTableDescriptor);
+ }
+ factTable.getStorageTables().getStorageTable().add(tblElement);
}
- factTable.getStorageTables().getStorageTable().add(tblElement);
}
return factTable;
}
@@ -431,7 +454,8 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
JAXBUtils.getFactUpdatePeriodsFromStorageTables(fact.getStorageTables()),
fact.getWeight(),
addFactColStartTimePropertyToFactProperties(fact),
- JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables()));
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()),
+ JAXBUtils.storageTablePrefixMapOfStorage(fact.getStorageTables()));
log.info("Created fact table " + fact.getName());
}
}
@@ -460,7 +484,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
public void updateFactTable(LensSessionHandle sessionid, XFactTable fact) throws LensException {
try (SessionContext ignored = new SessionContext(sessionid)){
getClient(sessionid).alterCubeFactTable(fact.getName(), JAXBUtils.cubeFactFromFactTable(fact),
- JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables()),
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()),
JAXBUtils.columnStartAndEndTimeFromXColumns(fact.getColumns()));
log.info("Updated fact table " + fact.getName());
} catch (HiveException e) {
@@ -587,11 +611,13 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
for (XUpdatePeriod sup : storageTable.getUpdatePeriods().getUpdatePeriod()) {
updatePeriods.add(UpdatePeriod.valueOf(sup.name()));
}
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient msClient = getClient(sessionid);
- msClient.addStorage(msClient.getFactTable(fact),
- storageTable.getStorageName(), updatePeriods,
- JAXBUtils.storageTableDescFromXStorageTableElement(storageTable));
+ XStorageTables tables = new XStorageTables();
+ tables.getStorageTable().add(storageTable);
+ msClient.addStorage(msClient.getFactTable(fact), storageTable.getStorageName(), updatePeriods,
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(tables),
+ JAXBUtils.storageTablePrefixMapOfStorage(tables).get(storageTable.getStorageName()));
log.info("Added storage " + storageTable.getStorageName() + ":" + updatePeriods + " for fact " + fact);
}
}
@@ -615,17 +641,34 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
return factTable;
}
+ private Set<String> getAllTablesForStorage(LensSessionHandle sessionHandle, String fact, String storageName)
+ throws LensException {
+ Set<String> storageTableNames = new HashSet<>();
+ if (getClient(sessionHandle).isFactTable(fact)) {
+ CubeFactTable cft = getClient(sessionHandle).getCubeFact(fact);
+ Map<UpdatePeriod, String> storageMap = cft.getStoragePrefixUpdatePeriodMap().get(storageName);
+ for (Map.Entry entry : storageMap.entrySet()) {
+ storageTableNames.add(MetastoreUtil.getStorageTableName(fact, Storage.getPrefix((String) entry.getValue())));
+ }
+ } else {
+ storageTableNames.add(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName));
+ }
+ return storageTableNames;
+ }
+
@Override
- public XPartitionList getAllPartitionsOfFactStorage(
- LensSessionHandle sessionid, String fact, String storageName,
+ public XPartitionList getAllPartitionsOfFactStorage(LensSessionHandle sessionid, String fact, String storageName,
String filter) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(fact,
- storageName);
- List<Partition> parts = client.getPartitionsByFilter(storageTableName, filter);
- List<String> timePartCols = client.getTimePartColNamesOfTable(storageTableName);
+ Set<String> storageTableNames = getAllTablesForStorage(sessionid, fact, storageName);
+ List<Partition> parts = new ArrayList<>();
+ List<String> timePartCols = new ArrayList<>();
+ for (String storageTableName : storageTableNames) {
+ parts.addAll(client.getPartitionsByFilter(storageTableName, filter));
+ timePartCols.addAll(client.getTimePartColNamesOfTable(storageTableName));
+ }
return xpartitionListFromPartitionList(fact, parts, timePartCols);
} catch (HiveException exc) {
throw new LensException(exc);
@@ -635,10 +678,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public int addPartitionToFactStorage(LensSessionHandle sessionid, String fact, String storageName,
XPartition partition) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
- return getClient(sessionid).addPartition(storagePartSpecFromXPartition(partition), storageName,
- CubeTableType.FACT).size();
+ return getClient(sessionid)
+ .addPartition(storagePartSpecFromXPartition(partition), storageName, CubeTableType.FACT).size();
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -647,10 +690,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public int addPartitionsToFactStorage(LensSessionHandle sessionid, String fact, String storageName,
XPartitionList partitions) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
- return getClient(sessionid).addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName,
- CubeTableType.FACT).size();
+ return getClient(sessionid)
+ .addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName, CubeTableType.FACT).size();
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -693,15 +736,17 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
}
@Override
- public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName,
- XPartition xPartition) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName, XPartition xPartition)
+ throws LensException {
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName);
+ String storageTableName = client
+ .getStorageTableName(tblName, storageName, UpdatePeriod.valueOf(xPartition.getUpdatePeriod().name()));
Partition existingPartition = client.getPartitionByFilter(storageTableName,
StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
- client.updatePartition(tblName, storageName, existingPartition);
+ client.updatePartition(tblName, storageName, existingPartition,
+ UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value()));
} catch (HiveException | ClassNotFoundException | InvalidOperationException | UnsupportedOperationException exc) {
throw new LensException(exc);
}
@@ -710,15 +755,23 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public void updatePartitions(LensSessionHandle sessionid, String tblName, String storageName,
XPartitionList xPartitions) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName);
- List<Partition> partitionsToUpdate = new ArrayList<>(xPartitions.getPartition().size());
- for (XPartition xPartition : xPartitions.getPartition()) {
- Partition existingPartition = client.getPartitionByFilter(storageTableName,
- StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
- JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
- partitionsToUpdate.add(existingPartition);
+ Set<String> storageTableNames = getAllTablesForStorage(sessionid, tblName, storageName);
+ Map<UpdatePeriod, List<Partition>> partitionsToUpdate = new HashMap<>();
+ for (String storageTableName : storageTableNames) {
+ for (XPartition xPartition : xPartitions.getPartition()) {
+ Partition existingPartition = client.getPartitionByFilter(storageTableName,
+ StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
+ JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
+ UpdatePeriod updatePeriod = UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value());
+ List<Partition> partitionList = partitionsToUpdate.get(updatePeriod);
+ if (partitionList == null) {
+ partitionList = new ArrayList<>();
+ partitionsToUpdate.put(updatePeriod, partitionList);
+ }
+ partitionList.add(existingPartition);
+ }
}
client.updatePartitions(tblName, storageName, partitionsToUpdate);
} catch (HiveException | ClassNotFoundException | InvalidOperationException exc) {
@@ -787,29 +840,35 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
return period;
}
- public void dropPartitionFromStorageByValues(LensSessionHandle sessionid,
- String cubeTableName, String storageName, String values) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
- String tableName = MetastoreUtil.getStorageTableName(cubeTableName,
- Storage.getPrefix(storageName));
+ public void dropPartitionFromStorageByValues(LensSessionHandle sessionid, String cubeTableName, String storageName,
+ String values) throws LensException {
+ try (SessionContext ignored = new SessionContext(sessionid)) {
+ Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName);
+ Map<String, List<Partition>> partitions = new HashMap<>();
CubeMetastoreClient msClient = getClient(sessionid);
- String filter = getFilter(msClient, tableName, values);
- List<Partition> partitions = msClient.getPartitionsByFilter(
- tableName, filter);
- if (partitions.size() > 1) {
- log.error("More than one partition with specified values, correspoding filter:" + filter);
- throw new BadRequestException("More than one partition with specified values");
- } else if (partitions.size() == 0) {
- log.error("No partition exists with specified values, correspoding filter:" + filter);
+ int totalPartitions = 0;
+ Partition part = null;
+ for (String tableName : storageTables) {
+ String filter = getFilter(msClient, tableName, values);
+ partitions.put(filter, msClient.getPartitionsByFilter(tableName, filter));
+ if (partitions.get(filter).size() > 1) {
+ log.error("More than one partition with specified values, corresponding filter:" + filter);
+ throw new BadRequestException("More than one partition with specified values");
+ }
+ if (partitions.get(filter).size() == 1) {
+ part = partitions.get(filter).get(0);
+ }
+ totalPartitions += partitions.get(filter).size();
+ }
+ if (totalPartitions == 0) {
+ log.error("No partition exists with specified values");
throw new NotFoundException("No partition exists with specified values");
}
Map<String, Date> timeSpec = new HashMap<>();
Map<String, String> nonTimeSpec = new HashMap<>();
- UpdatePeriod updatePeriod = populatePartSpec(partitions.get(0), timeSpec, nonTimeSpec);
- msClient.dropPartition(cubeTableName,
- storageName, timeSpec, nonTimeSpec, updatePeriod);
- log.info("Dropped partition for dimension: " + cubeTableName
- + " storage: " + storageName + " values:" + values);
+ UpdatePeriod updatePeriod = populatePartSpec(part, timeSpec, nonTimeSpec);
+ msClient.dropPartition(cubeTableName, storageName, timeSpec, nonTimeSpec, updatePeriod);
+ log.info("Dropped partition for dimension: " + cubeTableName + " storage: " + storageName + " values:" + values);
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -818,9 +877,12 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
public void dropPartitionFromStorageByFilter(LensSessionHandle sessionid, String cubeTableName,
String storageName, String filter) throws LensException {
try (SessionContext ignored = new SessionContext(sessionid)){
- String tableName = MetastoreUtil.getStorageTableName(cubeTableName, Storage.getPrefix(storageName));
+ Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName);
+ List<Partition> partitions = new ArrayList<>();
CubeMetastoreClient msClient = getClient(sessionid);
- List<Partition> partitions = msClient.getPartitionsByFilter(tableName, filter);
+ for (String tableName : storageTables) {
+ partitions.addAll(msClient.getPartitionsByFilter(tableName, filter));
+ }
for (Partition part : partitions) {
try {
Map<String, Date> timeSpec = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/lens/blob/f0dadd79/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
index 51fcb43..0bc8e77 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.mapred.InputFormat;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -588,14 +587,22 @@ public final class JAXBUtils {
return cols;
}
- public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables(
- XStorageTables storageTables) {
+ public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables(XStorageTables storageTables) {
if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
Map<String, Set<UpdatePeriod>> factUpdatePeriods = new LinkedHashMap<String, Set<UpdatePeriod>>();
for (XStorageTableElement ste : storageTables.getStorageTable()) {
- Set<UpdatePeriod> updatePeriods = new TreeSet<UpdatePeriod>();
- for (XUpdatePeriod upd : ste.getUpdatePeriods().getUpdatePeriod()) {
+ Set<UpdatePeriod> updatePeriods = new TreeSet<>();
+ // Check if the update period array is empty.
+ List<XUpdatePeriod> updatePeriodList = ste.getUpdatePeriods().getUpdatePeriod();
+ if (updatePeriodList.isEmpty()) {
+ List<XUpdatePeriodTableDescriptor> tableDescriptorList = ste.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor();
+ for (XUpdatePeriodTableDescriptor tableDescriptor : tableDescriptorList) {
+ updatePeriodList.add(tableDescriptor.getUpdatePeriod());
+ }
+ }
+ for (XUpdatePeriod upd : updatePeriodList) {
updatePeriods.add(UpdatePeriod.valueOf(upd.name()));
}
factUpdatePeriods.put(ste.getStorageName(), updatePeriods);
@@ -706,13 +713,10 @@ public final class JAXBUtils {
Map<String, Set<UpdatePeriod>> storageUpdatePeriods = getFactUpdatePeriodsFromStorageTables(
fact.getStorageTables());
-
- return new CubeFactTable(fact.getCubeName(),
- fact.getName(),
- columns,
- storageUpdatePeriods,
- fact.getWeight(),
- mapFromXProperties(fact.getProperties()));
+ Map<String, Map<UpdatePeriod, String>> storageTablePrefixMap = storageTablePrefixMapOfStorage(
+ fact.getStorageTables());
+ return new CubeFactTable(fact.getCubeName(), fact.getName(), columns, storageUpdatePeriods, fact.getWeight(),
+ mapFromXProperties(fact.getProperties()), storageTablePrefixMap);
}
public static Segmentation segmentationFromXSegmentation(XSegmentation seg) throws LensException {
@@ -849,11 +853,43 @@ public final class JAXBUtils {
return tblDesc;
}
- public static Map<String, StorageTableDesc> storageTableMapFromXStorageTables(XStorageTables storageTables) {
- Map<String, StorageTableDesc> storageTableMap = new HashMap<String, StorageTableDesc>();
+ public static Map<String, StorageTableDesc> tableDescPrefixMapFromXStorageTables(XStorageTables storageTables) {
+ Map<String, StorageTableDesc> storageTablePrefixToDescMap = new HashMap<>();
+ if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
+ for (XStorageTableElement sTbl : storageTables.getStorageTable()) {
+ if (!sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) {
+ for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor()) {
+ // Get table name with update period as the prefix.
+ storageTablePrefixToDescMap.put(updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName(),
+ storageTableDescFromXStorageTableDesc(updatePeriodTable.getTableDesc()));
+ }
+ } else {
+ storageTablePrefixToDescMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl));
+ }
+ }
+ }
+ return storageTablePrefixToDescMap;
+ }
+
+ public static Map<String, Map<UpdatePeriod, String>> storageTablePrefixMapOfStorage(XStorageTables storageTables) {
+ Map<String, Map<UpdatePeriod, String>> storageTableMap = new HashMap<>();
if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
for (XStorageTableElement sTbl : storageTables.getStorageTable()) {
- storageTableMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl));
+ Map<UpdatePeriod, String> storageNameMap = new HashMap<>();
+ if (!sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) {
+ for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor()) {
+ // Get table name with update period as the prefix.
+ storageNameMap.put(UpdatePeriod.valueOf(updatePeriodTable.getUpdatePeriod().value()),
+ updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName());
+ }
+ } else {
+ for (XUpdatePeriod updatePeriod :sTbl.getUpdatePeriods().getUpdatePeriod()) {
+ storageNameMap.put(UpdatePeriod.valueOf(updatePeriod.value()), sTbl.getStorageName());
+ }
+ }
+ storageTableMap.put(sTbl.getStorageName(), storageNameMap);
}
}
return storageTableMap;