You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/02/01 19:40:49 UTC
[30/50] [abbrv] hadoop git commit: YARN-4219. New levelDB cache
storage for timeline v1.5. Contributed by Li Lu
YARN-4219. New levelDB cache storage for timeline v1.5. Contributed by
Li Lu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9fab22b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9fab22b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9fab22b3
Branch: refs/heads/HDFS-7240
Commit: 9fab22b36673e7f1a0bb629d2c07966ac2482e99
Parents: 61382ff
Author: Xuan <xg...@apache.org>
Authored: Thu Jan 28 14:24:22 2016 -0800
Committer: Xuan <xg...@apache.org>
Committed: Thu Jan 28 14:24:22 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../hadoop/yarn/conf/YarnConfiguration.java | 7 +
.../src/main/resources/yarn-default.xml | 8 +
.../timeline/KeyValueBasedTimelineStore.java | 574 +++++++++++++++++++
.../server/timeline/MemoryTimelineStore.java | 491 ++--------------
.../timeline/TimelineStoreMapAdapter.java | 60 ++
.../yarn/server/timeline/util/LeveldbUtils.java | 7 +
.../pom.xml | 4 +
.../yarn/server/timeline/EntityCacheItem.java | 3 +-
.../timeline/LevelDBCacheTimelineStore.java | 316 ++++++++++
.../server/timeline/PluginStoreTestUtils.java | 2 +-
.../timeline/TestLevelDBCacheTimelineStore.java | 94 +++
12 files changed, 1114 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1b57a3d..8eaed42 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -384,6 +384,8 @@ Release 2.8.0 - UNRELEASED
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
caching. (Li Lu and Jason Lowe via junping_du)
+ YARN-4219. New levelDB cache storage for timeline v1.5. (Li Lu via xgong)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e214a86..d84c155 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1702,6 +1702,13 @@ public class YarnConfiguration extends Configuration {
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
"2000, 500";
+ public static final String TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ + "leveldb-cache-read-cache-size";
+
+ public static final long
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = 10 * 1024 * 1024;
+
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
public static final long
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6508a2a..e33d23e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2036,6 +2036,14 @@
<value>604800</value>
</property>
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name>
+ <description>
+ Read cache size for the leveldb cache storage in ATS v1.5 plugin storage.
+ </description>
+ <value>10485760</value>
+ </property>
+
<!-- Shared Cache Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
new file mode 100644
index 0000000..79e2bf2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+
+import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
+
+/**
+ * Map based implementation of {@link TimelineStore}. A hash map
+ * implementation should be connected to this implementation through a
+ * {@link TimelineStoreMapAdapter}.
+ *
+ * The methods are synchronized to avoid concurrent modifications.
+ *
+ */
+@Private
+@Unstable
+abstract class KeyValueBasedTimelineStore
+ extends AbstractService implements TimelineStore {
+
+ protected TimelineStoreMapAdapter<EntityIdentifier, TimelineEntity> entities;
+ protected TimelineStoreMapAdapter<EntityIdentifier, Long> entityInsertTimes;
+ protected TimelineStoreMapAdapter<String, TimelineDomain> domainById;
+ protected TimelineStoreMapAdapter<String, Set<TimelineDomain>> domainsByOwner;
+
+ private boolean serviceStopped = false;
+
+ private static final Log LOG
+ = LogFactory.getLog(KeyValueBasedTimelineStore.class);
+
+ public KeyValueBasedTimelineStore() {
+ super(KeyValueBasedTimelineStore.class.getName());
+ }
+
+ public KeyValueBasedTimelineStore(String name) {
+ super(name);
+ }
+
+ public synchronized boolean getServiceStopped() {
+ return serviceStopped;
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ serviceStopped = true;
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized TimelineEntities getEntities(String entityType, Long limit,
+ Long windowStart, Long windowEnd, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return null;
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ if (fields == null) {
+ fields = EnumSet.allOf(Field.class);
+ }
+
+ Iterator<TimelineEntity> entityIterator = null;
+ if (fromId != null) {
+ TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
+ entityType));
+ if (firstEntity == null) {
+ return new TimelineEntities();
+ } else {
+ entityIterator = entities.valueSetIterator(firstEntity);
+ }
+ }
+ if (entityIterator == null) {
+ entityIterator = entities.valueSetIterator();
+ }
+
+ List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+ while (entityIterator.hasNext()) {
+ TimelineEntity entity = entityIterator.next();
+ if (entitiesSelected.size() >= limit) {
+ break;
+ }
+ if (!entity.getEntityType().equals(entityType)) {
+ continue;
+ }
+ if (entity.getStartTime() <= windowStart) {
+ continue;
+ }
+ if (entity.getStartTime() > windowEnd) {
+ continue;
+ }
+ if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
+ entity.getEntityId(), entity.getEntityType())) > fromTs) {
+ continue;
+ }
+ if (primaryFilter != null &&
+ !KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
+ entity.getPrimaryFilters(), primaryFilter)) {
+ continue;
+ }
+ if (secondaryFilters != null) { // AND logic
+ boolean flag = true;
+ for (NameValuePair secondaryFilter : secondaryFilters) {
+ if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
+ .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
+ && !KeyValueBasedTimelineStoreUtils.matchFilter(
+ entity.getOtherInfo(), secondaryFilter)) {
+ flag = false;
+ break;
+ }
+ }
+ if (!flag) {
+ continue;
+ }
+ }
+ if (entity.getDomainId() == null) {
+ entity.setDomainId(DEFAULT_DOMAIN_ID);
+ }
+ if (checkAcl == null || checkAcl.check(entity)) {
+ entitiesSelected.add(entity);
+ }
+ }
+ List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+ for (TimelineEntity entitySelected : entitiesSelected) {
+ entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
+ entitySelected, fields));
+ }
+ Collections.sort(entitiesToReturn);
+ TimelineEntities entitiesWrapper = new TimelineEntities();
+ entitiesWrapper.setEntities(entitiesToReturn);
+ return entitiesWrapper;
+ }
+
+ @Override
+ public synchronized TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet<Field> fieldsToRetrieve) {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return null;
+ }
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.allOf(Field.class);
+ }
+ TimelineEntity
+ entity = entities.get(new EntityIdentifier(entityId, entityType));
+ if (entity == null) {
+ return null;
+ } else {
+ return KeyValueBasedTimelineStoreUtils.maskFields(
+ entity, fieldsToRetrieve);
+ }
+ }
+
+ @Override
+ public synchronized TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd,
+ Set<String> eventTypes) {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return null;
+ }
+ TimelineEvents allEvents = new TimelineEvents();
+ if (entityIds == null) {
+ return allEvents;
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (windowStart == null) {
+ windowStart = Long.MIN_VALUE;
+ }
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ for (String entityId : entityIds) {
+ EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+ TimelineEntity entity = entities.get(entityID);
+ if (entity == null) {
+ continue;
+ }
+ EventsOfOneEntity events = new EventsOfOneEntity();
+ events.setEntityId(entityId);
+ events.setEntityType(entityType);
+ for (TimelineEvent event : entity.getEvents()) {
+ if (events.getEvents().size() >= limit) {
+ break;
+ }
+ if (event.getTimestamp() <= windowStart) {
+ continue;
+ }
+ if (event.getTimestamp() > windowEnd) {
+ continue;
+ }
+ if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+ continue;
+ }
+ events.addEvent(event);
+ }
+ allEvents.addEvent(events);
+ }
+ return allEvents;
+ }
+
+ @Override
+ public TimelineDomain getDomain(String domainId)
+ throws IOException {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return null;
+ }
+ TimelineDomain domain = domainById.get(domainId);
+ if (domain == null) {
+ return null;
+ } else {
+ return KeyValueBasedTimelineStoreUtils.createTimelineDomain(
+ domain.getId(),
+ domain.getDescription(),
+ domain.getOwner(),
+ domain.getReaders(),
+ domain.getWriters(),
+ domain.getCreatedTime(),
+ domain.getModifiedTime());
+ }
+ }
+
+ @Override
+ public TimelineDomains getDomains(String owner)
+ throws IOException {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return null;
+ }
+ List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
+ Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
+ if (domainsOfOneOwner == null) {
+ return new TimelineDomains();
+ }
+ for (TimelineDomain domain : domainsByOwner.get(owner)) {
+ TimelineDomain domainToReturn = KeyValueBasedTimelineStoreUtils
+ .createTimelineDomain(
+ domain.getId(),
+ domain.getDescription(),
+ domain.getOwner(),
+ domain.getReaders(),
+ domain.getWriters(),
+ domain.getCreatedTime(),
+ domain.getModifiedTime());
+ domains.add(domainToReturn);
+ }
+ Collections.sort(domains, new Comparator<TimelineDomain>() {
+ @Override
+ public int compare(
+ TimelineDomain domain1, TimelineDomain domain2) {
+ int result = domain2.getCreatedTime().compareTo(
+ domain1.getCreatedTime());
+ if (result == 0) {
+ return domain2.getModifiedTime().compareTo(
+ domain1.getModifiedTime());
+ } else {
+ return result;
+ }
+ }
+ });
+ TimelineDomains domainsToReturn = new TimelineDomains();
+ domainsToReturn.addDomains(domains);
+ return domainsToReturn;
+ }
+
+ @Override
+ public synchronized TimelinePutResponse put(TimelineEntities data) {
+ TimelinePutResponse response = new TimelinePutResponse();
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ TimelinePutError error = new TimelinePutError();
+ error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+ response.addError(error);
+ return response;
+ }
+ for (TimelineEntity entity : data.getEntities()) {
+ EntityIdentifier entityId =
+ new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+ // store entity info in memory
+ TimelineEntity existingEntity = entities.get(entityId);
+ boolean needsPut = false;
+ if (existingEntity == null) {
+ existingEntity = new TimelineEntity();
+ existingEntity.setEntityId(entity.getEntityId());
+ existingEntity.setEntityType(entity.getEntityType());
+ existingEntity.setStartTime(entity.getStartTime());
+ if (entity.getDomainId() == null ||
+ entity.getDomainId().length() == 0) {
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entityId.getId());
+ error.setEntityType(entityId.getType());
+ error.setErrorCode(TimelinePutError.NO_DOMAIN);
+ response.addError(error);
+ continue;
+ }
+ existingEntity.setDomainId(entity.getDomainId());
+ // insert a new entity to the storage, update insert time map
+ entityInsertTimes.put(entityId, System.currentTimeMillis());
+ needsPut = true;
+ }
+ if (entity.getEvents() != null) {
+ if (existingEntity.getEvents() == null) {
+ existingEntity.setEvents(entity.getEvents());
+ } else {
+ existingEntity.addEvents(entity.getEvents());
+ }
+ Collections.sort(existingEntity.getEvents());
+ needsPut = true;
+ }
+ // check startTime
+ if (existingEntity.getStartTime() == null) {
+ if (existingEntity.getEvents() == null
+ || existingEntity.getEvents().isEmpty()) {
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entityId.getId());
+ error.setEntityType(entityId.getType());
+ error.setErrorCode(TimelinePutError.NO_START_TIME);
+ response.addError(error);
+ entities.remove(entityId);
+ entityInsertTimes.remove(entityId);
+ continue;
+ } else {
+ Long min = Long.MAX_VALUE;
+ for (TimelineEvent e : entity.getEvents()) {
+ if (min > e.getTimestamp()) {
+ min = e.getTimestamp();
+ }
+ }
+ existingEntity.setStartTime(min);
+ needsPut = true;
+ }
+ }
+ if (entity.getPrimaryFilters() != null) {
+ if (existingEntity.getPrimaryFilters() == null) {
+ existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
+ }
+ for (Entry<String, Set<Object>> pf :
+ entity.getPrimaryFilters().entrySet()) {
+ for (Object pfo : pf.getValue()) {
+ existingEntity.addPrimaryFilter(pf.getKey(),
+ KeyValueBasedTimelineStoreUtils.compactNumber(pfo));
+ needsPut = true;
+ }
+ }
+ }
+ if (entity.getOtherInfo() != null) {
+ if (existingEntity.getOtherInfo() == null) {
+ existingEntity.setOtherInfo(new HashMap<String, Object>());
+ }
+ for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
+ existingEntity.addOtherInfo(info.getKey(),
+ KeyValueBasedTimelineStoreUtils.compactNumber(info.getValue()));
+ needsPut = true;
+ }
+ }
+ if (needsPut) {
+ entities.put(entityId, existingEntity);
+ }
+
+ // relate it to other entities
+ if (entity.getRelatedEntities() == null) {
+ continue;
+ }
+ for (Entry<String, Set<String>> partRelatedEntities : entity
+ .getRelatedEntities().entrySet()) {
+ if (partRelatedEntities == null) {
+ continue;
+ }
+ for (String idStr : partRelatedEntities.getValue()) {
+ EntityIdentifier relatedEntityId =
+ new EntityIdentifier(idStr, partRelatedEntities.getKey());
+ TimelineEntity relatedEntity = entities.get(relatedEntityId);
+ if (relatedEntity != null) {
+ if (relatedEntity.getDomainId().equals(
+ existingEntity.getDomainId())) {
+ relatedEntity.addRelatedEntity(
+ existingEntity.getEntityType(), existingEntity.getEntityId());
+ entities.put(relatedEntityId, relatedEntity);
+ } else {
+ // in this case the entity will be put, but the relation will be
+ // ignored
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityType(existingEntity.getEntityType());
+ error.setEntityId(existingEntity.getEntityId());
+ error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
+ response.addError(error);
+ }
+ } else {
+ relatedEntity = new TimelineEntity();
+ relatedEntity.setEntityId(relatedEntityId.getId());
+ relatedEntity.setEntityType(relatedEntityId.getType());
+ relatedEntity.setStartTime(existingEntity.getStartTime());
+ relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+ existingEntity.getEntityId());
+ relatedEntity.setDomainId(existingEntity.getDomainId());
+ entities.put(relatedEntityId, relatedEntity);
+ entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
+ }
+ }
+ }
+ }
+ return response;
+ }
+
+ public void put(TimelineDomain domain) throws IOException {
+ if (getServiceStopped()) {
+ LOG.info("Service stopped, return null for the storage");
+ return;
+ }
+ TimelineDomain domainToReplace =
+ domainById.get(domain.getId());
+ Long currentTimestamp = System.currentTimeMillis();
+ TimelineDomain domainToStore
+ = KeyValueBasedTimelineStoreUtils.createTimelineDomain(
+ domain.getId(), domain.getDescription(), domain.getOwner(),
+ domain.getReaders(), domain.getWriters(),
+ (domainToReplace == null ?
+ currentTimestamp : domainToReplace.getCreatedTime()),
+ currentTimestamp);
+ domainById.put(domainToStore.getId(), domainToStore);
+ Set<TimelineDomain> domainsByOneOwner =
+ domainsByOwner.get(domainToStore.getOwner());
+ if (domainsByOneOwner == null) {
+ domainsByOneOwner = new HashSet<TimelineDomain>();
+ domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
+ }
+ if (domainToReplace != null) {
+ domainsByOneOwner.remove(domainToReplace);
+ }
+ domainsByOneOwner.add(domainToStore);
+ }
+
+ private static class KeyValueBasedTimelineStoreUtils {
+
+ static TimelineDomain createTimelineDomain(
+ String id, String description, String owner,
+ String readers, String writers,
+ Long createdTime, Long modifiedTime) {
+ TimelineDomain domainToStore = new TimelineDomain();
+ domainToStore.setId(id);
+ domainToStore.setDescription(description);
+ domainToStore.setOwner(owner);
+ domainToStore.setReaders(readers);
+ domainToStore.setWriters(writers);
+ domainToStore.setCreatedTime(createdTime);
+ domainToStore.setModifiedTime(modifiedTime);
+ return domainToStore;
+ }
+
+ static TimelineEntity maskFields(
+ TimelineEntity entity, EnumSet<Field> fields) {
+ // Conceal the fields that are not going to be exposed
+ TimelineEntity entityToReturn = new TimelineEntity();
+ entityToReturn.setEntityId(entity.getEntityId());
+ entityToReturn.setEntityType(entity.getEntityType());
+ entityToReturn.setStartTime(entity.getStartTime());
+ entityToReturn.setDomainId(entity.getDomainId());
+ // Deep copy
+ if (fields.contains(Field.EVENTS)) {
+ entityToReturn.addEvents(entity.getEvents());
+ } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+ entityToReturn.addEvent(entity.getEvents().get(0));
+ } else {
+ entityToReturn.setEvents(null);
+ }
+ if (fields.contains(Field.RELATED_ENTITIES)) {
+ entityToReturn.addRelatedEntities(entity.getRelatedEntities());
+ } else {
+ entityToReturn.setRelatedEntities(null);
+ }
+ if (fields.contains(Field.PRIMARY_FILTERS)) {
+ entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
+ } else {
+ entityToReturn.setPrimaryFilters(null);
+ }
+ if (fields.contains(Field.OTHER_INFO)) {
+ entityToReturn.addOtherInfo(entity.getOtherInfo());
+ } else {
+ entityToReturn.setOtherInfo(null);
+ }
+ return entityToReturn;
+ }
+
+ static boolean matchFilter(Map<String, Object> tags,
+ NameValuePair filter) {
+ Object value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+ return false;
+ }
+ return true;
+ }
+
+ static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+ NameValuePair filter) {
+ Set<Object> value = tags.get(filter.getName());
+ if (value == null) { // doesn't have the filter
+ return false;
+ } else {
+ return value.contains(filter.getValue());
+ }
+ }
+
+ static Object compactNumber(Object o) {
+ if (o instanceof Long) {
+ Long l = (Long) o;
+ if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
+ return l.intValue();
+ }
+ }
+ return o;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
index 3489114..5c2db00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
@@ -18,39 +18,14 @@
package org.apache.hadoop.yarn.server.timeline;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.SortedSet;
import java.util.TreeSet;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
-import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
-
-import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
-
/**
* In-memory implementation of {@link TimelineStore}. This
* implementation is for test purpose only. If users improperly instantiate it,
@@ -62,448 +37,60 @@ import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT
*/
@Private
@Unstable
-public class MemoryTimelineStore
- extends AbstractService implements TimelineStore {
-
- private Map<EntityIdentifier, TimelineEntity> entities =
- new HashMap<EntityIdentifier, TimelineEntity>();
- private Map<EntityIdentifier, Long> entityInsertTimes =
- new HashMap<EntityIdentifier, Long>();
- private Map<String, TimelineDomain> domainsById =
- new HashMap<String, TimelineDomain>();
- private Map<String, Set<TimelineDomain>> domainsByOwner =
- new HashMap<String, Set<TimelineDomain>>();
+public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
- public MemoryTimelineStore() {
- super(MemoryTimelineStore.class.getName());
- }
+ static class HashMapStoreAdapter<K, V>
+ implements TimelineStoreMapAdapter<K, V> {
+ Map<K, V> internalMap = new HashMap<>();
- @Override
- public synchronized TimelineEntities getEntities(String entityType, Long limit,
- Long windowStart, Long windowEnd, String fromId, Long fromTs,
- NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
- EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- if (windowStart == null) {
- windowStart = Long.MIN_VALUE;
- }
- if (windowEnd == null) {
- windowEnd = Long.MAX_VALUE;
- }
- if (fields == null) {
- fields = EnumSet.allOf(Field.class);
+ @Override
+ public V get(K key) {
+ return internalMap.get(key);
}
- Iterator<TimelineEntity> entityIterator = null;
- if (fromId != null) {
- TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
- entityType));
- if (firstEntity == null) {
- return new TimelineEntities();
- } else {
- entityIterator = new TreeSet<TimelineEntity>(entities.values())
- .tailSet(firstEntity, true).iterator();
- }
- }
- if (entityIterator == null) {
- entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
- .iterator();
+ @Override
+ public void put(K key, V value) {
+ internalMap.put(key, value);
}
- List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
- while (entityIterator.hasNext()) {
- TimelineEntity entity = entityIterator.next();
- if (entitiesSelected.size() >= limit) {
- break;
- }
- if (!entity.getEntityType().equals(entityType)) {
- continue;
- }
- if (entity.getStartTime() <= windowStart) {
- continue;
- }
- if (entity.getStartTime() > windowEnd) {
- continue;
- }
- if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
- entity.getEntityId(), entity.getEntityType())) > fromTs) {
- continue;
- }
- if (primaryFilter != null &&
- !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
- continue;
- }
- if (secondaryFilters != null) { // AND logic
- boolean flag = true;
- for (NameValuePair secondaryFilter : secondaryFilters) {
- if (secondaryFilter != null && !matchPrimaryFilter(
- entity.getPrimaryFilters(), secondaryFilter) &&
- !matchFilter(entity.getOtherInfo(), secondaryFilter)) {
- flag = false;
- break;
- }
- }
- if (!flag) {
- continue;
- }
- }
- if (entity.getDomainId() == null) {
- entity.setDomainId(DEFAULT_DOMAIN_ID);
- }
- if (checkAcl == null || checkAcl.check(entity)) {
- entitiesSelected.add(entity);
- }
- }
- List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
- for (TimelineEntity entitySelected : entitiesSelected) {
- entitiesToReturn.add(maskFields(entitySelected, fields));
+ @Override
+ public void remove(K key) {
+ internalMap.remove(key);
}
- Collections.sort(entitiesToReturn);
- TimelineEntities entitiesWrapper = new TimelineEntities();
- entitiesWrapper.setEntities(entitiesToReturn);
- return entitiesWrapper;
- }
-
- @Override
- public synchronized TimelineEntity getEntity(String entityId, String entityType,
- EnumSet<Field> fieldsToRetrieve) {
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.allOf(Field.class);
- }
- TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
- if (entity == null) {
- return null;
- } else {
- return maskFields(entity, fieldsToRetrieve);
- }
- }
- @Override
- public synchronized TimelineEvents getEntityTimelines(String entityType,
- SortedSet<String> entityIds, Long limit, Long windowStart,
- Long windowEnd,
- Set<String> eventTypes) {
- TimelineEvents allEvents = new TimelineEvents();
- if (entityIds == null) {
- return allEvents;
+ @Override
+ public Iterator<V>
+ valueSetIterator() {
+ return new TreeSet<>(internalMap.values()).iterator();
}
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- if (windowStart == null) {
- windowStart = Long.MIN_VALUE;
- }
- if (windowEnd == null) {
- windowEnd = Long.MAX_VALUE;
- }
- for (String entityId : entityIds) {
- EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
- TimelineEntity entity = entities.get(entityID);
- if (entity == null) {
- continue;
- }
- EventsOfOneEntity events = new EventsOfOneEntity();
- events.setEntityId(entityId);
- events.setEntityType(entityType);
- for (TimelineEvent event : entity.getEvents()) {
- if (events.getEvents().size() >= limit) {
- break;
- }
- if (event.getTimestamp() <= windowStart) {
- continue;
- }
- if (event.getTimestamp() > windowEnd) {
- continue;
- }
- if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
- continue;
- }
- events.addEvent(event);
- }
- allEvents.addEvent(events);
- }
- return allEvents;
- }
- @Override
- public TimelineDomain getDomain(String domainId)
- throws IOException {
- TimelineDomain domain = domainsById.get(domainId);
- if (domain == null) {
- return null;
- } else {
- return createTimelineDomain(
- domain.getId(),
- domain.getDescription(),
- domain.getOwner(),
- domain.getReaders(),
- domain.getWriters(),
- domain.getCreatedTime(),
- domain.getModifiedTime());
- }
- }
-
- @Override
- public TimelineDomains getDomains(String owner)
- throws IOException {
- List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
- Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
- if (domainsOfOneOwner == null) {
- return new TimelineDomains();
- }
- for (TimelineDomain domain : domainsByOwner.get(owner)) {
- TimelineDomain domainToReturn = createTimelineDomain(
- domain.getId(),
- domain.getDescription(),
- domain.getOwner(),
- domain.getReaders(),
- domain.getWriters(),
- domain.getCreatedTime(),
- domain.getModifiedTime());
- domains.add(domainToReturn);
- }
- Collections.sort(domains, new Comparator<TimelineDomain>() {
- @Override
- public int compare(
- TimelineDomain domain1, TimelineDomain domain2) {
- int result = domain2.getCreatedTime().compareTo(
- domain1.getCreatedTime());
- if (result == 0) {
- return domain2.getModifiedTime().compareTo(
- domain1.getModifiedTime());
- } else {
- return result;
- }
- }
- });
- TimelineDomains domainsToReturn = new TimelineDomains();
- domainsToReturn.addDomains(domains);
- return domainsToReturn;
- }
-
- @Override
- public synchronized TimelinePutResponse put(TimelineEntities data) {
- TimelinePutResponse response = new TimelinePutResponse();
- for (TimelineEntity entity : data.getEntities()) {
- EntityIdentifier entityId =
- new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
- // store entity info in memory
- TimelineEntity existingEntity = entities.get(entityId);
- if (existingEntity == null) {
- existingEntity = new TimelineEntity();
- existingEntity.setEntityId(entity.getEntityId());
- existingEntity.setEntityType(entity.getEntityType());
- existingEntity.setStartTime(entity.getStartTime());
- if (entity.getDomainId() == null ||
- entity.getDomainId().length() == 0) {
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entityId.getId());
- error.setEntityType(entityId.getType());
- error.setErrorCode(TimelinePutError.NO_DOMAIN);
- response.addError(error);
- continue;
- }
- existingEntity.setDomainId(entity.getDomainId());
- entities.put(entityId, existingEntity);
- entityInsertTimes.put(entityId, System.currentTimeMillis());
- }
- if (entity.getEvents() != null) {
- if (existingEntity.getEvents() == null) {
- existingEntity.setEvents(entity.getEvents());
- } else {
- existingEntity.addEvents(entity.getEvents());
- }
- Collections.sort(existingEntity.getEvents());
- }
- // check startTime
- if (existingEntity.getStartTime() == null) {
- if (existingEntity.getEvents() == null
- || existingEntity.getEvents().isEmpty()) {
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entityId.getId());
- error.setEntityType(entityId.getType());
- error.setErrorCode(TimelinePutError.NO_START_TIME);
- response.addError(error);
- entities.remove(entityId);
- entityInsertTimes.remove(entityId);
- continue;
- } else {
- Long min = Long.MAX_VALUE;
- for (TimelineEvent e : entity.getEvents()) {
- if (min > e.getTimestamp()) {
- min = e.getTimestamp();
- }
- }
- existingEntity.setStartTime(min);
- }
- }
- if (entity.getPrimaryFilters() != null) {
- if (existingEntity.getPrimaryFilters() == null) {
- existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
- }
- for (Entry<String, Set<Object>> pf :
- entity.getPrimaryFilters().entrySet()) {
- for (Object pfo : pf.getValue()) {
- existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
- }
- }
- }
- if (entity.getOtherInfo() != null) {
- if (existingEntity.getOtherInfo() == null) {
- existingEntity.setOtherInfo(new HashMap<String, Object>());
- }
- for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
- existingEntity.addOtherInfo(info.getKey(),
- maybeConvert(info.getValue()));
- }
- }
- // relate it to other entities
- if (entity.getRelatedEntities() == null) {
- continue;
- }
- for (Map.Entry<String, Set<String>> partRelatedEntities : entity
- .getRelatedEntities().entrySet()) {
- if (partRelatedEntities == null) {
- continue;
- }
- for (String idStr : partRelatedEntities.getValue()) {
- EntityIdentifier relatedEntityId =
- new EntityIdentifier(idStr, partRelatedEntities.getKey());
- TimelineEntity relatedEntity = entities.get(relatedEntityId);
- if (relatedEntity != null) {
- if (relatedEntity.getDomainId().equals(
- existingEntity.getDomainId())) {
- relatedEntity.addRelatedEntity(
- existingEntity.getEntityType(), existingEntity.getEntityId());
- } else {
- // in this case the entity will be put, but the relation will be
- // ignored
- TimelinePutError error = new TimelinePutError();
- error.setEntityType(existingEntity.getEntityType());
- error.setEntityId(existingEntity.getEntityId());
- error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
- response.addError(error);
- }
- } else {
- relatedEntity = new TimelineEntity();
- relatedEntity.setEntityId(relatedEntityId.getId());
- relatedEntity.setEntityType(relatedEntityId.getType());
- relatedEntity.setStartTime(existingEntity.getStartTime());
- relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
- existingEntity.getEntityId());
- relatedEntity.setDomainId(existingEntity.getDomainId());
- entities.put(relatedEntityId, relatedEntity);
- entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
+ @Override
+ @SuppressWarnings("unchecked")
+ public Iterator<V> valueSetIterator(V minV) {
+ if (minV instanceof Comparable) {
+ TreeSet<V> tempTreeSet = new TreeSet<>();
+ for (V value : internalMap.values()) {
+ if (((Comparable) value).compareTo(minV) >= 0) {
+ tempTreeSet.add(value);
}
}
+ return tempTreeSet.iterator();
+ } else {
+ return valueSetIterator();
}
}
- return response;
- }
-
- public void put(TimelineDomain domain) throws IOException {
- TimelineDomain domainToReplace =
- domainsById.get(domain.getId());
- Long currentTimestamp = System.currentTimeMillis();
- TimelineDomain domainToStore = createTimelineDomain(
- domain.getId(), domain.getDescription(), domain.getOwner(),
- domain.getReaders(), domain.getWriters(),
- (domainToReplace == null ?
- currentTimestamp : domainToReplace.getCreatedTime()),
- currentTimestamp);
- domainsById.put(domainToStore.getId(), domainToStore);
- Set<TimelineDomain> domainsByOneOwner =
- domainsByOwner.get(domainToStore.getOwner());
- if (domainsByOneOwner == null) {
- domainsByOneOwner = new HashSet<TimelineDomain>();
- domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
- }
- if (domainToReplace != null) {
- domainsByOneOwner.remove(domainToReplace);
- }
- domainsByOneOwner.add(domainToStore);
- }
-
- private static TimelineDomain createTimelineDomain(
- String id, String description, String owner,
- String readers, String writers,
- Long createdTime, Long modifiedTime) {
- TimelineDomain domainToStore = new TimelineDomain();
- domainToStore.setId(id);
- domainToStore.setDescription(description);
- domainToStore.setOwner(owner);
- domainToStore.setReaders(readers);
- domainToStore.setWriters(writers);
- domainToStore.setCreatedTime(createdTime);
- domainToStore.setModifiedTime(modifiedTime);
- return domainToStore;
- }
-
- private static TimelineEntity maskFields(
- TimelineEntity entity, EnumSet<Field> fields) {
- // Conceal the fields that are not going to be exposed
- TimelineEntity entityToReturn = new TimelineEntity();
- entityToReturn.setEntityId(entity.getEntityId());
- entityToReturn.setEntityType(entity.getEntityType());
- entityToReturn.setStartTime(entity.getStartTime());
- entityToReturn.setDomainId(entity.getDomainId());
- // Deep copy
- if (fields.contains(Field.EVENTS)) {
- entityToReturn.addEvents(entity.getEvents());
- } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
- entityToReturn.addEvent(entity.getEvents().get(0));
- } else {
- entityToReturn.setEvents(null);
- }
- if (fields.contains(Field.RELATED_ENTITIES)) {
- entityToReturn.addRelatedEntities(entity.getRelatedEntities());
- } else {
- entityToReturn.setRelatedEntities(null);
- }
- if (fields.contains(Field.PRIMARY_FILTERS)) {
- entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
- } else {
- entityToReturn.setPrimaryFilters(null);
- }
- if (fields.contains(Field.OTHER_INFO)) {
- entityToReturn.addOtherInfo(entity.getOtherInfo());
- } else {
- entityToReturn.setOtherInfo(null);
- }
- return entityToReturn;
}
- private static boolean matchFilter(Map<String, Object> tags,
- NameValuePair filter) {
- Object value = tags.get(filter.getName());
- if (value == null) { // doesn't have the filter
- return false;
- } else if (!value.equals(filter.getValue())) { // doesn't match the filter
- return false;
- }
- return true;
- }
-
- private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
- NameValuePair filter) {
- Set<Object> value = tags.get(filter.getName());
- if (value == null) { // doesn't have the filter
- return false;
- } else {
- return value.contains(filter.getValue());
- }
+ public MemoryTimelineStore() {
+ this(MemoryTimelineStore.class.getName());
}
- private static Object maybeConvert(Object o) {
- if (o instanceof Long) {
- Long l = (Long)o;
- if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
- return l.intValue();
- }
- }
- return o;
+ public MemoryTimelineStore(String name) {
+ super(name);
+ entities = new HashMapStoreAdapter<>();
+ entityInsertTimes = new HashMapStoreAdapter<>();
+ domainById = new HashMapStoreAdapter<>();
+ domainsByOwner = new HashMapStoreAdapter<>();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
new file mode 100644
index 0000000..175ed0b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.util.Iterator;
+
+/**
+ * An adapter for map timeline store implementations
+ * @param <K> the type of the key set
+ * @param <V> the type of the value set
+ */
+interface TimelineStoreMapAdapter<K, V> {
+ /**
+ * @param key
+ * @return map(key)
+ */
+ V get(K key);
+
+ /**
+ * Add mapping key->value in the map
+ * @param key
+ * @param value
+ */
+ void put(K key, V value);
+
+ /**
+ * Remove mapping with key keyToRemove
+ * @param keyToRemove
+ */
+ void remove(K keyToRemove);
+
+ /**
+ * @return the iterator of the value set of the map
+ */
+ Iterator<V> valueSetIterator();
+
+ /**
+ * Return the iterator of the value set of the map, starting from minV if type
+ * V is comparable.
+ * @param minV
+ * @return
+ */
+ Iterator<V> valueSetIterator(V minV);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
index 5638581..82c7f26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.timeline.util;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.WritableComparator;
import java.io.IOException;
@@ -177,4 +178,10 @@ public class LeveldbUtils {
prefixlen) == 0;
}
+ /**
+ * Default permission mask for the level db dir
+ */
+ public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+ .createImmutable((short) 0700);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
index 385ba5d..71f76d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
@@ -132,5 +132,9 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index 37a1d8d..efbf994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -102,7 +102,8 @@ public class EntityCacheItem {
}
if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) {
- store = new MemoryTimelineStore();
+ store = new LevelDBCacheTimelineStore(groupId.toString(),
+ "LeveldbCache." + groupId);
store.init(config);
store.start();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
new file mode 100644
index 0000000..976241f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * LevelDB implementation of {@link KeyValueBasedTimelineStore}. This
+ * implementation stores the entity hash map into a LevelDB instance.
+ * There are two partitions of the key space. One partition is to store a
+ * entity id to start time mapping:
+ *
+ * i!ENTITY_ID!ENTITY_TYPE -> ENTITY_START_TIME
+ *
+ * The other partition is to store the actual data:
+ *
+ * e!START_TIME!ENTITY_ID!ENTITY_TYPE -> ENTITY_BYTES
+ *
+ * This storage does not have any garbage collection mechanism, and is designed
+ * mainly for caching usages.
+ */
+@Private
+@Unstable
+public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
+ private static final Log LOG
+ = LogFactory.getLog(LevelDBCacheTimelineStore.class);
+ private static final String CACHED_LDB_FILE_PREFIX = "-timeline-cache.ldb";
+ private String dbId;
+ private DB entityDb;
+ private Configuration configuration;
+
+ public LevelDBCacheTimelineStore(String id, String name) {
+ super(name);
+ dbId = id;
+ entityInsertTimes = new MemoryTimelineStore.HashMapStoreAdapter<>();
+ domainById = new MemoryTimelineStore.HashMapStoreAdapter<>();
+ domainsByOwner = new MemoryTimelineStore.HashMapStoreAdapter<>();
+ }
+
+ public LevelDBCacheTimelineStore(String id) {
+ this(id, LevelDBCacheTimelineStore.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ configuration = conf;
+ Options options = new Options();
+ options.createIfMissing(true);
+ options.cacheSize(conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE,
+ YarnConfiguration.
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE));
+ JniDBFactory factory = new JniDBFactory();
+ Path dbPath = new Path(
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+ dbId + CACHED_LDB_FILE_PREFIX);
+ FileSystem localFS = null;
+
+ try {
+ localFS = FileSystem.getLocal(conf);
+ if (!localFS.exists(dbPath)) {
+ if (!localFS.mkdirs(dbPath)) {
+ throw new IOException("Couldn't create directory for leveldb " +
+ "timeline store " + dbPath);
+ }
+ localFS.setPermission(dbPath, LeveldbUtils.LEVELDB_DIR_UMASK);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, localFS);
+ }
+ LOG.info("Using leveldb path " + dbPath);
+ entityDb = factory.open(new File(dbPath.toString()), options);
+ entities = new LevelDBMapAdapter<>(entityDb);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ IOUtils.cleanup(LOG, entityDb);
+ Path dbPath = new Path(
+ configuration.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+ dbId + CACHED_LDB_FILE_PREFIX);
+ FileSystem localFS = null;
+ try {
+ localFS = FileSystem.getLocal(configuration);
+ if (!localFS.delete(dbPath, true)) {
+ throw new IOException("Couldn't delete data file for leveldb " +
+ "timeline store " + dbPath);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, localFS);
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * A specialized hash map storage that uses LevelDB for storing entity id to
+ * entity mappings.
+ *
+ * @param <K> an {@link EntityIdentifier} typed hash key
+ * @param <V> a {@link TimelineEntity} typed value
+ */
+ static class LevelDBMapAdapter<K extends EntityIdentifier,
+ V extends TimelineEntity> implements TimelineStoreMapAdapter<K, V> {
+ private static final String TIME_INDEX_PREFIX = "i";
+ private static final String ENTITY_STORAGE_PREFIX = "e";
+ DB entityDb;
+
+ public LevelDBMapAdapter(DB currLevelDb) {
+ entityDb = currLevelDb;
+ }
+
+ @Override
+ public V get(K entityId) {
+ V result = null;
+ // Read the start time from the index
+ byte[] startTimeBytes = entityDb.get(getStartTimeKey(entityId));
+ if (startTimeBytes == null) {
+ return null;
+ }
+
+ // Build the key for the entity storage and read it
+ try {
+ result = getEntityForKey(getEntityKey(entityId, startTimeBytes));
+ } catch (IOException e) {
+ LOG.error("GenericObjectMapper cannot read key from key "
+ + entityId.toString()
+ + " into an object. Read aborted! ");
+ LOG.error(e.getMessage());
+ }
+
+ return result;
+ }
+
+ @Override
+ public void put(K entityId, V entity) {
+ Long startTime = entity.getStartTime();
+ if (startTime == null) {
+ startTime = System.currentTimeMillis();
+ }
+ // Build the key for the entity storage and read it
+ byte[] startTimeBytes = GenericObjectMapper.writeReverseOrderedLong(
+ startTime);
+ try {
+ byte[] valueBytes = GenericObjectMapper.write(entity);
+ entityDb.put(getEntityKey(entityId, startTimeBytes), valueBytes);
+ } catch (IOException e) {
+ LOG.error("GenericObjectMapper cannot write "
+ + entity.getClass().getName()
+ + " into a byte array. Write aborted! ");
+ LOG.error(e.getMessage());
+ }
+
+ // Build the key for the start time index
+ entityDb.put(getStartTimeKey(entityId), startTimeBytes);
+ }
+
+ @Override
+ public void remove(K entityId) {
+ // Read the start time from the index (key starts with an "i") then delete
+ // the record
+ LeveldbUtils.KeyBuilder startTimeKeyBuilder
+ = LeveldbUtils.KeyBuilder.newInstance();
+ startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
+ .add(entityId.getType());
+ byte[] startTimeBytes = entityDb.get(startTimeKeyBuilder.getBytes());
+ if (startTimeBytes == null) {
+ return;
+ }
+ entityDb.delete(startTimeKeyBuilder.getBytes());
+
+ // Build the key for the entity storage and delete it
+ entityDb.delete(getEntityKey(entityId, startTimeBytes));
+ }
+
+ @Override
+ public Iterator<V> valueSetIterator() {
+ return getIterator(null, Long.MAX_VALUE);
+ }
+
+ @Override
+ public Iterator<V> valueSetIterator(V minV) {
+ return getIterator(
+ new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
+ minV.getStartTime());
+ }
+
+ private Iterator<V> getIterator(
+ EntityIdentifier startId, long startTimeMax) {
+
+ final DBIterator internalDbIterator = entityDb.iterator();
+
+ // we need to iterate from the first element with key greater than or
+ // equal to ENTITY_STORAGE_PREFIX!maxTS(!startId), but stop on the first
+ // key who does not have prefix ENTITY_STORATE_PREFIX
+
+ // decide end prefix
+ LeveldbUtils.KeyBuilder entityPrefixKeyBuilder
+ = LeveldbUtils.KeyBuilder.newInstance();
+ entityPrefixKeyBuilder.add(ENTITY_STORAGE_PREFIX);
+ final byte[] prefixBytes = entityPrefixKeyBuilder.getBytesForLookup();
+ // decide start prefix on top of end prefix and seek
+ final byte[] startTimeBytes
+ = GenericObjectMapper.writeReverseOrderedLong(startTimeMax);
+ entityPrefixKeyBuilder.add(startTimeBytes, true);
+ if (startId != null) {
+ entityPrefixKeyBuilder.add(startId.getId());
+ }
+ final byte[] startPrefixBytes
+ = entityPrefixKeyBuilder.getBytesForLookup();
+ internalDbIterator.seek(startPrefixBytes);
+
+ return new Iterator<V>() {
+ @Override
+ public boolean hasNext() {
+ if (!internalDbIterator.hasNext()) {
+ return false;
+ }
+ Map.Entry<byte[], byte[]> nextEntry = internalDbIterator.peekNext();
+ if (LeveldbUtils.prefixMatches(
+ prefixBytes, prefixBytes.length, nextEntry.getKey())) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public V next() {
+ if (hasNext()) {
+ Map.Entry<byte[], byte[]> nextRaw = internalDbIterator.next();
+ try {
+ V result = getEntityForKey(nextRaw.getKey());
+ return result;
+ } catch (IOException e) {
+ LOG.error("GenericObjectMapper cannot read key from key "
+ + nextRaw.getKey()
+ + " into an object. Read aborted! ");
+ LOG.error(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ // We do not support remove operations within one iteration
+ @Override
+ public void remove() {
+ LOG.error("LevelDB map adapter does not support iterate-and-remove"
+ + " use cases. ");
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ private V getEntityForKey(byte[] key) throws IOException {
+ byte[] resultRaw = entityDb.get(key);
+ if (resultRaw == null) {
+ return null;
+ }
+ ObjectMapper entityMapper = new ObjectMapper();
+ return (V) entityMapper.readValue(resultRaw, TimelineEntity.class);
+ }
+
+ private byte[] getStartTimeKey(K entityId) {
+ LeveldbUtils.KeyBuilder startTimeKeyBuilder
+ = LeveldbUtils.KeyBuilder.newInstance();
+ startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
+ .add(entityId.getType());
+ return startTimeKeyBuilder.getBytes();
+ }
+
+ private byte[] getEntityKey(K entityId, byte[] startTimeBytes) {
+ LeveldbUtils.KeyBuilder entityKeyBuilder
+ = LeveldbUtils.KeyBuilder.newInstance();
+ entityKeyBuilder.add(ENTITY_STORAGE_PREFIX).add(startTimeBytes, true)
+ .add(entityId.getId()).add(entityId.getType());
+ return entityKeyBuilder.getBytes();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
index e0379b1..f529b59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
@@ -200,7 +200,7 @@ class PluginStoreTestUtils {
}
static TimelineDataManager getTdmWithMemStore(Configuration config) {
- TimelineStore store = new MemoryTimelineStore();
+ TimelineStore store = new MemoryTimelineStore("MemoryStore.test");
TimelineDataManager tdm = getTdmWithStore(config, store);
return tdm;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fab22b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java
new file mode 100644
index 0000000..66da1e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestLevelDBCacheTimelineStore extends TimelineStoreTestUtils {
+
+ @Before
+ public void setup() throws Exception {
+ store = new LevelDBCacheTimelineStore("app1");
+ store.init(new YarnConfiguration());
+ store.start();
+ loadTestEntityData();
+ loadVerificationEntityData();
+ loadTestDomainData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop();
+ }
+
+ public TimelineStore getTimelineStore() {
+ return store;
+ }
+
+ @Test
+ public void testGetSingleEntity() throws IOException {
+ super.testGetSingleEntity();
+ }
+
+ @Test
+ public void testGetEntities() throws IOException {
+ super.testGetEntities();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromId() throws IOException {
+ super.testGetEntitiesWithFromId();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromTs() throws IOException {
+ super.testGetEntitiesWithFromTs();
+ }
+
+ @Test
+ public void testGetEntitiesWithPrimaryFilters() throws IOException {
+ super.testGetEntitiesWithPrimaryFilters();
+ }
+
+ @Test
+ public void testGetEntitiesWithSecondaryFilters() throws IOException {
+ super.testGetEntitiesWithSecondaryFilters();
+ }
+
+ @Test
+ public void testGetEvents() throws IOException {
+ super.testGetEvents();
+ }
+
+ @Test
+ public void testGetDomain() throws IOException {
+ super.testGetDomain();
+ }
+
+ @Test
+ public void testGetDomains() throws IOException {
+ super.testGetDomains();
+ }
+
+}