You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/01 21:03:42 UTC
[14/22] ambari git commit: AMBARI-5707. Renaming a module. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java
deleted file mode 100644
index 6d248b2..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ContainerStartDataPBImpl.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
-import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
-import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
-
-import com.google.protobuf.TextFormat;
-
-public class ContainerStartDataPBImpl extends ContainerStartData {
-
- ContainerStartDataProto proto = ContainerStartDataProto.getDefaultInstance();
- ContainerStartDataProto.Builder builder = null;
- boolean viaProto = false;
-
- private ContainerId containerId;
- private Resource resource;
- private NodeId nodeId;
- private Priority priority;
-
- public ContainerStartDataPBImpl() {
- builder = ContainerStartDataProto.newBuilder();
- }
-
- public ContainerStartDataPBImpl(ContainerStartDataProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- @Override
- public ContainerId getContainerId() {
- if (this.containerId != null) {
- return this.containerId;
- }
- ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasContainerId()) {
- return null;
- }
- this.containerId = convertFromProtoFormat(p.getContainerId());
- return this.containerId;
- }
-
- @Override
- public void setContainerId(ContainerId containerId) {
- maybeInitBuilder();
- if (containerId == null) {
- builder.clearContainerId();
- }
- this.containerId = containerId;
- }
-
- @Override
- public Resource getAllocatedResource() {
- if (this.resource != null) {
- return this.resource;
- }
- ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasAllocatedResource()) {
- return null;
- }
- this.resource = convertFromProtoFormat(p.getAllocatedResource());
- return this.resource;
- }
-
- @Override
- public void setAllocatedResource(Resource resource) {
- maybeInitBuilder();
- if (resource == null) {
- builder.clearAllocatedResource();
- }
- this.resource = resource;
- }
-
- @Override
- public NodeId getAssignedNode() {
- if (this.nodeId != null) {
- return this.nodeId;
- }
- ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasAssignedNodeId()) {
- return null;
- }
- this.nodeId = convertFromProtoFormat(p.getAssignedNodeId());
- return this.nodeId;
- }
-
- @Override
- public void setAssignedNode(NodeId nodeId) {
- maybeInitBuilder();
- if (nodeId == null) {
- builder.clearAssignedNodeId();
- }
- this.nodeId = nodeId;
- }
-
- @Override
- public Priority getPriority() {
- if (this.priority != null) {
- return this.priority;
- }
- ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasPriority()) {
- return null;
- }
- this.priority = convertFromProtoFormat(p.getPriority());
- return this.priority;
- }
-
- @Override
- public void setPriority(Priority priority) {
- maybeInitBuilder();
- if (priority == null) {
- builder.clearPriority();
- }
- this.priority = priority;
- }
-
- @Override
- public long getStartTime() {
- ContainerStartDataProtoOrBuilder p = viaProto ? proto : builder;
- return p.getStartTime();
- }
-
- @Override
- public void setStartTime(long startTime) {
- maybeInitBuilder();
- builder.setStartTime(startTime);
- }
-
- public ContainerStartDataProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- @Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- @Override
- public String toString() {
- return TextFormat.shortDebugString(getProto());
- }
-
- private void mergeLocalToBuilder() {
- if (this.containerId != null
- && !((ContainerIdPBImpl) this.containerId).getProto().equals(
- builder.getContainerId())) {
- builder.setContainerId(convertToProtoFormat(this.containerId));
- }
- if (this.resource != null
- && !((ResourcePBImpl) this.resource).getProto().equals(
- builder.getAllocatedResource())) {
- builder.setAllocatedResource(convertToProtoFormat(this.resource));
- }
- if (this.nodeId != null
- && !((NodeIdPBImpl) this.nodeId).getProto().equals(
- builder.getAssignedNodeId())) {
- builder.setAssignedNodeId(convertToProtoFormat(this.nodeId));
- }
- if (this.priority != null
- && !((PriorityPBImpl) this.priority).getProto().equals(
- builder.getPriority())) {
- builder.setPriority(convertToProtoFormat(this.priority));
- }
- }
-
- private void mergeLocalToProto() {
- if (viaProto) {
- maybeInitBuilder();
- }
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = ContainerStartDataProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private ContainerIdProto convertToProtoFormat(ContainerId containerId) {
- return ((ContainerIdPBImpl) containerId).getProto();
- }
-
- private ContainerIdPBImpl
- convertFromProtoFormat(ContainerIdProto containerId) {
- return new ContainerIdPBImpl(containerId);
- }
-
- private ResourceProto convertToProtoFormat(Resource resource) {
- return ((ResourcePBImpl) resource).getProto();
- }
-
- private ResourcePBImpl convertFromProtoFormat(ResourceProto resource) {
- return new ResourcePBImpl(resource);
- }
-
- private NodeIdProto convertToProtoFormat(NodeId nodeId) {
- return ((NodeIdPBImpl) nodeId).getProto();
- }
-
- private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
- return new NodeIdPBImpl(nodeId);
- }
-
- private PriorityProto convertToProtoFormat(Priority priority) {
- return ((PriorityPBImpl) priority).getProto();
- }
-
- private PriorityPBImpl convertFromProtoFormat(PriorityProto priority) {
- return new PriorityPBImpl(priority);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java
deleted file mode 100644
index 4b202d8..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/EntityIdentifier.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * The unique identifier for an entity
- */
-@Private
-@Unstable
-public class EntityIdentifier implements Comparable<EntityIdentifier> {
-
- private String id;
- private String type;
-
- public EntityIdentifier(String id, String type) {
- this.id = id;
- this.type = type;
- }
-
- /**
- * Get the entity Id.
- * @return The entity Id.
- */
- public String getId() {
- return id;
- }
-
- /**
- * Get the entity type.
- * @return The entity type.
- */
- public String getType() {
- return type;
- }
-
- @Override
- public int compareTo(EntityIdentifier other) {
- int c = type.compareTo(other.type);
- if (c != 0) return c;
- return id.compareTo(other.id);
- }
-
- @Override
- public int hashCode() {
- // generated by eclipse
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- result = prime * result + ((type == null) ? 0 : type.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- // generated by eclipse
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- EntityIdentifier other = (EntityIdentifier) obj;
- if (id == null) {
- if (other.id != null)
- return false;
- } else if (!id.equals(other.id))
- return false;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "{ id: " + id + ", type: "+ type + " }";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java
deleted file mode 100644
index b1846a3..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.map.ObjectWriter;
-
-/**
- * A utility class providing methods for serializing and deserializing
- * objects. The {@link #write(Object)} and {@link #read(byte[])} methods are
- * used by the {@link LeveldbTimelineStore} to store and retrieve arbitrary
- * JSON, while the {@link #writeReverseOrderedLong} and {@link
- * #readReverseOrderedLong} methods are used to sort entities in descending
- * start time order.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class GenericObjectMapper {
- private static final byte[] EMPTY_BYTES = new byte[0];
-
- public static final ObjectReader OBJECT_READER;
- public static final ObjectWriter OBJECT_WRITER;
-
- static {
- ObjectMapper mapper = new ObjectMapper();
- OBJECT_READER = mapper.reader(Object.class);
- OBJECT_WRITER = mapper.writer();
- }
-
- /**
- * Serializes an Object into a byte array. Along with {@link #read(byte[])},
- * can be used to serialize an Object and deserialize it into an Object of
- * the same type without needing to specify the Object's type,
- * as long as it is one of the JSON-compatible objects understood by
- * ObjectMapper.
- *
- * @param o An Object
- * @return A byte array representation of the Object
- * @throws IOException if there is a write error
- */
- public static byte[] write(Object o) throws IOException {
- if (o == null) {
- return EMPTY_BYTES;
- }
- return OBJECT_WRITER.writeValueAsBytes(o);
- }
-
- /**
- * Deserializes an Object from a byte array created with
- * {@link #write(Object)}.
- *
- * @param b A byte array
- * @return An Object
- * @throws IOException if there is a read error
- */
- public static Object read(byte[] b) throws IOException {
- return read(b, 0);
- }
-
- /**
- * Deserializes an Object from a byte array at a specified offset, assuming
- * the bytes were created with {@link #write(Object)}.
- *
- * @param b A byte array
- * @param offset Offset into the array
- * @return An Object
- * @throws IOException if there is a read error
- */
- public static Object read(byte[] b, int offset) throws IOException {
- if (b == null || b.length == 0) {
- return null;
- }
- return OBJECT_READER.readValue(b, offset, b.length - offset);
- }
-
- /**
- * Converts a long to a 8-byte array so that lexicographic ordering of the
- * produced byte arrays sort the longs in descending order.
- *
- * @param l A long
- * @return A byte array
- */
- public static byte[] writeReverseOrderedLong(long l) {
- byte[] b = new byte[8];
- return writeReverseOrderedLong(l, b, 0);
- }
-
- public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) {
- b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff));
- for (int i = offset+1; i < offset+7; i++) {
- b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
- }
- b[offset+7] = (byte)(0xff ^ (l & 0xff));
- return b;
- }
-
- /**
- * Reads 8 bytes from an array starting at the specified offset and
- * converts them to a long. The bytes are assumed to have been created
- * with {@link #writeReverseOrderedLong}.
- *
- * @param b A byte array
- * @param offset An offset into the byte array
- * @return A long
- */
- public static long readReverseOrderedLong(byte[] b, int offset) {
- long l = b[offset] & 0xff;
- for (int i = 1; i < 8; i++) {
- l = l << 8;
- l = l | (b[offset+i]&0xff);
- }
- return l ^ 0x7fffffffffffffffl;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
deleted file mode 100644
index edd4842..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
+++ /dev/null
@@ -1,1473 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.collections.map.LRUMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.ReadOptions;
-import org.iq80.leveldb.WriteBatch;
-import org.iq80.leveldb.WriteOptions;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
-
-/**
- * <p>An implementation of an application timeline store backed by leveldb.</p>
- *
- * <p>There are three sections of the db, the start time section,
- * the entity section, and the indexed entity section.</p>
- *
- * <p>The start time section is used to retrieve the unique start time for
- * a given entity. Its values each contain a start time while its keys are of
- * the form:</p>
- * <pre>
- * START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
- *
- * <p>The entity section is ordered by entity type, then entity start time
- * descending, then entity ID. There are four sub-sections of the entity
- * section: events, primary filters, related entities,
- * and other info. The event entries have event info serialized into their
- * values. The other info entries have values corresponding to the values of
- * the other info name/value map for the entry (note the names are contained
- * in the key). All other entries have empty values. The key structure is as
- * follows:</p>
- * <pre>
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
- *
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
- * EVENTS_COLUMN + reveventtimestamp + eventtype
- *
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
- * PRIMARY_FILTERS_COLUMN + name + value
- *
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
- * OTHER_INFO_COLUMN + name
- *
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
- * RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
- *
- * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
- * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
- * relatedentity id</pre>
- *
- * <p>The indexed entity section contains a primary filter name and primary
- * filter value as the prefix. Within a given name/value, entire entity
- * entries are stored in the same format as described in the entity section
- * above (below, "key" represents any one of the possible entity entry keys
- * described above).</p>
- * <pre>
- * INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
- * key</pre>
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class LeveldbTimelineStore extends AbstractService
- implements TimelineStore {
- private static final Log LOG = LogFactory
- .getLog(LeveldbTimelineStore.class);
-
- private static final String FILENAME = "leveldb-timeline-store.ldb";
-
- private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
- private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
- private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
-
- private static final byte[] EVENTS_COLUMN = "e".getBytes();
- private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes();
- private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
- private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes();
- private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
- "z".getBytes();
-
- private static final byte[] EMPTY_BYTES = new byte[0];
-
- private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
- private Map<EntityIdentifier, Long> startTimeReadCache;
-
- /**
- * Per-entity locks are obtained when writing.
- */
- private final LockMap<EntityIdentifier> writeLocks =
- new LockMap<EntityIdentifier>();
-
- private final ReentrantReadWriteLock deleteLock =
- new ReentrantReadWriteLock();
-
- private DB db;
-
- private Thread deletionThread;
-
- public LeveldbTimelineStore() {
- super(LeveldbTimelineStore.class.getName());
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected void serviceInit(Configuration conf) throws Exception {
- Options options = new Options();
- options.createIfMissing(true);
- options.cacheSize(conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
- JniDBFactory factory = new JniDBFactory();
- String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
- File p = new File(path);
- if (!p.exists()) {
- if (!p.mkdirs()) {
- throw new IOException("Couldn't create directory for leveldb " +
- "timeline store " + path);
- }
- }
- LOG.info("Using leveldb path " + path);
- db = factory.open(new File(path, FILENAME), options);
- startTimeWriteCache =
- Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
- conf)));
- startTimeReadCache =
- Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
- conf)));
-
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
- deletionThread = new EntityDeletionThread(conf);
- deletionThread.start();
- }
-
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (deletionThread != null) {
- deletionThread.interrupt();
- LOG.info("Waiting for deletion thread to complete its current action");
- try {
- deletionThread.join();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for deletion thread to complete," +
- " closing db now", e);
- }
- }
- IOUtils.cleanup(LOG, db);
- super.serviceStop();
- }
-
- private static class StartAndInsertTime {
- final long startTime;
- final long insertTime;
-
- public StartAndInsertTime(long startTime, long insertTime) {
- this.startTime = startTime;
- this.insertTime = insertTime;
- }
- }
-
- private class EntityDeletionThread extends Thread {
- private final long ttl;
- private final long ttlInterval;
-
- public EntityDeletionThread(Configuration conf) {
- ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
- ttlInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
- LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
- "interval " + ttlInterval);
- }
-
- @Override
- public void run() {
- while (true) {
- long timestamp = System.currentTimeMillis() - ttl;
- try {
- discardOldEntities(timestamp);
- Thread.sleep(ttlInterval);
- } catch (IOException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.info("Deletion thread received interrupt, exiting");
- break;
- }
- }
- }
- }
-
- private static class LockMap<K> {
- private static class CountingReentrantLock<K> extends ReentrantLock {
- private static final long serialVersionUID = 1L;
- private int count;
- private K key;
-
- CountingReentrantLock(K key) {
- super();
- this.count = 0;
- this.key = key;
- }
- }
-
- private Map<K, CountingReentrantLock<K>> locks =
- new HashMap<K, CountingReentrantLock<K>>();
-
- synchronized CountingReentrantLock<K> getLock(K key) {
- CountingReentrantLock<K> lock = locks.get(key);
- if (lock == null) {
- lock = new CountingReentrantLock<K>(key);
- locks.put(key, lock);
- }
-
- lock.count++;
- return lock;
- }
-
- synchronized void returnLock(CountingReentrantLock<K> lock) {
- if (lock.count == 0) {
- throw new IllegalStateException("Returned lock more times than it " +
- "was retrieved");
- }
- lock.count--;
-
- if (lock.count == 0) {
- locks.remove(lock.key);
- }
- }
- }
-
- private static class KeyBuilder {
- private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
- private byte[][] b;
- private boolean[] useSeparator;
- private int index;
- private int length;
-
- public KeyBuilder(int size) {
- b = new byte[size][];
- useSeparator = new boolean[size];
- index = 0;
- length = 0;
- }
-
- public static KeyBuilder newInstance() {
- return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
- }
-
- public KeyBuilder add(String s) {
- return add(s.getBytes(), true);
- }
-
- public KeyBuilder add(byte[] t) {
- return add(t, false);
- }
-
- public KeyBuilder add(byte[] t, boolean sep) {
- b[index] = t;
- useSeparator[index] = sep;
- length += t.length;
- if (sep) {
- length++;
- }
- index++;
- return this;
- }
-
- public byte[] getBytes() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
- for (int i = 0; i < index; i++) {
- baos.write(b[i]);
- if (i < index-1 && useSeparator[i]) {
- baos.write(0x0);
- }
- }
- return baos.toByteArray();
- }
-
- public byte[] getBytesForLookup() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
- for (int i = 0; i < index; i++) {
- baos.write(b[i]);
- if (useSeparator[i]) {
- baos.write(0x0);
- }
- }
- return baos.toByteArray();
- }
- }
-
- private static class KeyParser {
- private final byte[] b;
- private int offset;
-
- public KeyParser(byte[] b, int offset) {
- this.b = b;
- this.offset = offset;
- }
-
- public String getNextString() throws IOException {
- if (offset >= b.length) {
- throw new IOException(
- "tried to read nonexistent string from byte array");
- }
- int i = 0;
- while (offset+i < b.length && b[offset+i] != 0x0) {
- i++;
- }
- String s = new String(b, offset, i);
- offset = offset + i + 1;
- return s;
- }
-
- public long getNextLong() throws IOException {
- if (offset+8 >= b.length) {
- throw new IOException("byte array ran out when trying to read long");
- }
- long l = readReverseOrderedLong(b, offset);
- offset += 8;
- return l;
- }
-
- public int getOffset() {
- return offset;
- }
- }
-
- @Override
- public TimelineEntity getEntity(String entityId, String entityType,
- EnumSet<Field> fields) throws IOException {
- Long revStartTime = getStartTimeLong(entityId, entityType);
- if (revStartTime == null) {
- return null;
- }
- byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType).add(writeReverseOrderedLong(revStartTime))
- .add(entityId).getBytesForLookup();
-
- DBIterator iterator = null;
- try {
- iterator = db.iterator();
- iterator.seek(prefix);
-
- return getEntity(entityId, entityType, revStartTime, fields, iterator,
- prefix, prefix.length);
- } finally {
- IOUtils.cleanup(LOG, iterator);
- }
- }
-
- /**
- * Read entity from a db iterator. If no information is found in the
- * specified fields for this entity, return null.
- */
- private static TimelineEntity getEntity(String entityId, String entityType,
- Long startTime, EnumSet<Field> fields, DBIterator iterator,
- byte[] prefix, int prefixlen) throws IOException {
- if (fields == null) {
- fields = EnumSet.allOf(Field.class);
- }
-
- TimelineEntity entity = new TimelineEntity();
- boolean events = false;
- boolean lastEvent = false;
- if (fields.contains(Field.EVENTS)) {
- events = true;
- } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
- lastEvent = true;
- } else {
- entity.setEvents(null);
- }
- boolean relatedEntities = false;
- if (fields.contains(Field.RELATED_ENTITIES)) {
- relatedEntities = true;
- } else {
- entity.setRelatedEntities(null);
- }
- boolean primaryFilters = false;
- if (fields.contains(Field.PRIMARY_FILTERS)) {
- primaryFilters = true;
- } else {
- entity.setPrimaryFilters(null);
- }
- boolean otherInfo = false;
- if (fields.contains(Field.OTHER_INFO)) {
- otherInfo = true;
- } else {
- entity.setOtherInfo(null);
- }
-
- // iterate through the entity's entry, parsing information if it is part
- // of a requested field
- for (; iterator.hasNext(); iterator.next()) {
- byte[] key = iterator.peekNext().getKey();
- if (!prefixMatches(prefix, prefixlen, key)) {
- break;
- }
- if (key.length == prefixlen) {
- continue;
- }
- if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
- if (primaryFilters) {
- addPrimaryFilter(entity, key,
- prefixlen + PRIMARY_FILTERS_COLUMN.length);
- }
- } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
- if (otherInfo) {
- entity.addOtherInfo(parseRemainingKey(key,
- prefixlen + OTHER_INFO_COLUMN.length),
- GenericObjectMapper.read(iterator.peekNext().getValue()));
- }
- } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
- if (relatedEntities) {
- addRelatedEntity(entity, key,
- prefixlen + RELATED_ENTITIES_COLUMN.length);
- }
- } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
- if (events || (lastEvent &&
- entity.getEvents().size() == 0)) {
- TimelineEvent event = getEntityEvent(null, key, prefixlen +
- EVENTS_COLUMN.length, iterator.peekNext().getValue());
- if (event != null) {
- entity.addEvent(event);
- }
- }
- } else {
- if (key[prefixlen] !=
- INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
- LOG.warn(String.format("Found unexpected column for entity %s of " +
- "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
- }
- }
- }
-
- entity.setEntityId(entityId);
- entity.setEntityType(entityType);
- entity.setStartTime(startTime);
-
- return entity;
- }
-
- @Override
- public TimelineEvents getEntityTimelines(String entityType,
- SortedSet<String> entityIds, Long limit, Long windowStart,
- Long windowEnd, Set<String> eventType) throws IOException {
- TimelineEvents events = new TimelineEvents();
- if (entityIds == null || entityIds.isEmpty()) {
- return events;
- }
- // create a lexicographically-ordered map from start time to entities
- Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
- List<EntityIdentifier>>(new Comparator<byte[]>() {
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
- o2.length);
- }
- });
- DBIterator iterator = null;
- try {
- // look up start times for the specified entities
- // skip entities with no start time
- for (String entityId : entityIds) {
- byte[] startTime = getStartTime(entityId, entityType);
- if (startTime != null) {
- List<EntityIdentifier> entities = startTimeMap.get(startTime);
- if (entities == null) {
- entities = new ArrayList<EntityIdentifier>();
- startTimeMap.put(startTime, entities);
- }
- entities.add(new EntityIdentifier(entityId, entityType));
- }
- }
- for (Entry<byte[], List<EntityIdentifier>> entry :
- startTimeMap.entrySet()) {
- // look up the events matching the given parameters (limit,
- // start time, end time, event types) for entities whose start times
- // were found and add the entities to the return list
- byte[] revStartTime = entry.getKey();
- for (EntityIdentifier entityIdentifier : entry.getValue()) {
- EventsOfOneEntity entity = new EventsOfOneEntity();
- entity.setEntityId(entityIdentifier.getId());
- entity.setEntityType(entityType);
- events.addEvent(entity);
- KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType).add(revStartTime).add(entityIdentifier.getId())
- .add(EVENTS_COLUMN);
- byte[] prefix = kb.getBytesForLookup();
- if (windowEnd == null) {
- windowEnd = Long.MAX_VALUE;
- }
- byte[] revts = writeReverseOrderedLong(windowEnd);
- kb.add(revts);
- byte[] first = kb.getBytesForLookup();
- byte[] last = null;
- if (windowStart != null) {
- last = KeyBuilder.newInstance().add(prefix)
- .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
- }
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- iterator = db.iterator();
- for (iterator.seek(first); entity.getEvents().size() < limit &&
- iterator.hasNext(); iterator.next()) {
- byte[] key = iterator.peekNext().getKey();
- if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
- WritableComparator.compareBytes(key, 0, key.length, last, 0,
- last.length) > 0)) {
- break;
- }
- TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
- iterator.peekNext().getValue());
- if (event != null) {
- entity.addEvent(event);
- }
- }
- }
- }
- } finally {
- IOUtils.cleanup(LOG, iterator);
- }
- return events;
- }
-
- /**
- * Returns true if the byte array begins with the specified prefix.
- */
- private static boolean prefixMatches(byte[] prefix, int prefixlen,
- byte[] b) {
- if (b.length < prefixlen) {
- return false;
- }
- return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
- prefixlen) == 0;
- }
-
- @Override
- public TimelineEntities getEntities(String entityType,
- Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
- NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
- EnumSet<Field> fields) throws IOException {
- if (primaryFilter == null) {
- // if no primary filter is specified, prefix the lookup with
- // ENTITY_ENTRY_PREFIX
- return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
- windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
- } else {
- // if a primary filter is specified, prefix the lookup with
- // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
- // ENTITY_ENTRY_PREFIX
- byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
- .add(primaryFilter.getName())
- .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
- .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
- return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
- fromId, fromTs, secondaryFilters, fields);
- }
- }
-
- /**
- * Retrieves a list of entities satisfying given parameters.
- *
- * @param base A byte array prefix for the lookup
- * @param entityType The type of the entity
- * @param limit A limit on the number of entities to return
- * @param starttime The earliest entity start time to retrieve (exclusive)
- * @param endtime The latest entity start time to retrieve (inclusive)
- * @param fromId Retrieve entities starting with this entity
- * @param fromTs Ignore entities with insert timestamp later than this ts
- * @param secondaryFilters Filter pairs that the entities should match
- * @param fields The set of fields to retrieve
- * @return A list of entities
- * @throws IOException
- */
- private TimelineEntities getEntityByTime(byte[] base,
- String entityType, Long limit, Long starttime, Long endtime,
- String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
- EnumSet<Field> fields) throws IOException {
- DBIterator iterator = null;
- try {
- KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
- // only db keys matching the prefix (base + entity type) will be parsed
- byte[] prefix = kb.getBytesForLookup();
- if (endtime == null) {
- // if end time is null, place no restriction on end time
- endtime = Long.MAX_VALUE;
- }
- // construct a first key that will be seeked to using end time or fromId
- byte[] first = null;
- if (fromId != null) {
- Long fromIdStartTime = getStartTimeLong(fromId, entityType);
- if (fromIdStartTime == null) {
- // no start time for provided id, so return empty entities
- return new TimelineEntities();
- }
- if (fromIdStartTime <= endtime) {
- // if provided id's start time falls before the end of the window,
- // use it to construct the seek key
- first = kb.add(writeReverseOrderedLong(fromIdStartTime))
- .add(fromId).getBytesForLookup();
- }
- }
- // if seek key wasn't constructed using fromId, construct it using end ts
- if (first == null) {
- first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
- }
- byte[] last = null;
- if (starttime != null) {
- // if start time is not null, set a last key that will not be
- // iterated past
- last = KeyBuilder.newInstance().add(base).add(entityType)
- .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
- }
- if (limit == null) {
- // if limit is not specified, use the default
- limit = DEFAULT_LIMIT;
- }
-
- TimelineEntities entities = new TimelineEntities();
- iterator = db.iterator();
- iterator.seek(first);
- // iterate until one of the following conditions is met: limit is
- // reached, there are no more keys, the key prefix no longer matches,
- // or a start time has been specified and reached/exceeded
- while (entities.getEntities().size() < limit && iterator.hasNext()) {
- byte[] key = iterator.peekNext().getKey();
- if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
- WritableComparator.compareBytes(key, 0, key.length, last, 0,
- last.length) > 0)) {
- break;
- }
- // read the start time and entity id from the current key
- KeyParser kp = new KeyParser(key, prefix.length);
- Long startTime = kp.getNextLong();
- String entityId = kp.getNextString();
-
- if (fromTs != null) {
- long insertTime = readReverseOrderedLong(iterator.peekNext()
- .getValue(), 0);
- if (insertTime > fromTs) {
- byte[] firstKey = key;
- while (iterator.hasNext() && prefixMatches(firstKey,
- kp.getOffset(), key)) {
- iterator.next();
- key = iterator.peekNext().getKey();
- }
- continue;
- }
- }
-
- // parse the entity that owns this key, iterating over all keys for
- // the entity
- TimelineEntity entity = getEntity(entityId, entityType, startTime,
- fields, iterator, key, kp.getOffset());
- // determine if the retrieved entity matches the provided secondary
- // filters, and if so add it to the list of entities to return
- boolean filterPassed = true;
- if (secondaryFilters != null) {
- for (NameValuePair filter : secondaryFilters) {
- Object v = entity.getOtherInfo().get(filter.getName());
- if (v == null) {
- Set<Object> vs = entity.getPrimaryFilters()
- .get(filter.getName());
- if (vs != null && !vs.contains(filter.getValue())) {
- filterPassed = false;
- break;
- }
- } else if (!v.equals(filter.getValue())) {
- filterPassed = false;
- break;
- }
- }
- }
- if (filterPassed) {
- entities.addEntity(entity);
- }
- }
- return entities;
- } finally {
- IOUtils.cleanup(LOG, iterator);
- }
- }
-
- /**
- * Put a single entity. If there is an error, add a TimelinePutError to the
- * given response.
- */
- private void put(TimelineEntity entity, TimelinePutResponse response) {
- LockMap.CountingReentrantLock<EntityIdentifier> lock =
- writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
- entity.getEntityType()));
- lock.lock();
- WriteBatch writeBatch = null;
- List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
- new ArrayList<EntityIdentifier>();
- byte[] revStartTime = null;
- try {
- writeBatch = db.createWriteBatch();
- List<TimelineEvent> events = entity.getEvents();
- // look up the start time for the entity
- StartAndInsertTime startAndInsertTime = getAndSetStartTime(
- entity.getEntityId(), entity.getEntityType(),
- entity.getStartTime(), events);
- if (startAndInsertTime == null) {
- // if no start time is found, add an error and return
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.NO_START_TIME);
- response.addError(error);
- return;
- }
- revStartTime = writeReverseOrderedLong(startAndInsertTime
- .startTime);
-
- Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
-
- // write entity marker
- byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
- entity.getEntityType(), revStartTime);
- byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
- .insertTime);
- writeBatch.put(markerKey, markerValue);
- writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
- markerValue);
-
- // write event entries
- if (events != null && !events.isEmpty()) {
- for (TimelineEvent event : events) {
- byte[] revts = writeReverseOrderedLong(event.getTimestamp());
- byte[] key = createEntityEventKey(entity.getEntityId(),
- entity.getEntityType(), revStartTime, revts,
- event.getEventType());
- byte[] value = GenericObjectMapper.write(event.getEventInfo());
- writeBatch.put(key, value);
- writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
- }
- }
-
- // write related entity entries
- Map<String, Set<String>> relatedEntities =
- entity.getRelatedEntities();
- if (relatedEntities != null && !relatedEntities.isEmpty()) {
- for (Entry<String, Set<String>> relatedEntityList :
- relatedEntities.entrySet()) {
- String relatedEntityType = relatedEntityList.getKey();
- for (String relatedEntityId : relatedEntityList.getValue()) {
- // invisible "reverse" entries (entity -> related entity)
- byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
- entity.getEntityType(), revStartTime, relatedEntityId,
- relatedEntityType);
- writeBatch.put(key, EMPTY_BYTES);
- // look up start time of related entity
- byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
- relatedEntityType);
- // delay writing the related entity if no start time is found
- if (relatedEntityStartTime == null) {
- relatedEntitiesWithoutStartTimes.add(
- new EntityIdentifier(relatedEntityId, relatedEntityType));
- continue;
- }
- // write "forward" entry (related entity -> entity)
- key = createRelatedEntityKey(relatedEntityId,
- relatedEntityType, relatedEntityStartTime,
- entity.getEntityId(), entity.getEntityType());
- writeBatch.put(key, EMPTY_BYTES);
- }
- }
- }
-
- // write primary filter entries
- if (primaryFilters != null && !primaryFilters.isEmpty()) {
- for (Entry<String, Set<Object>> primaryFilter :
- primaryFilters.entrySet()) {
- for (Object primaryFilterValue : primaryFilter.getValue()) {
- byte[] key = createPrimaryFilterKey(entity.getEntityId(),
- entity.getEntityType(), revStartTime,
- primaryFilter.getKey(), primaryFilterValue);
- writeBatch.put(key, EMPTY_BYTES);
- writePrimaryFilterEntries(writeBatch, primaryFilters, key,
- EMPTY_BYTES);
- }
- }
- }
-
- // write other info entries
- Map<String, Object> otherInfo = entity.getOtherInfo();
- if (otherInfo != null && !otherInfo.isEmpty()) {
- for (Entry<String, Object> i : otherInfo.entrySet()) {
- byte[] key = createOtherInfoKey(entity.getEntityId(),
- entity.getEntityType(), revStartTime, i.getKey());
- byte[] value = GenericObjectMapper.write(i.getValue());
- writeBatch.put(key, value);
- writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
- }
- }
- db.write(writeBatch);
- } catch (IOException e) {
- LOG.error("Error putting entity " + entity.getEntityId() +
- " of type " + entity.getEntityType(), e);
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.IO_EXCEPTION);
- response.addError(error);
- } finally {
- lock.unlock();
- writeLocks.returnLock(lock);
- IOUtils.cleanup(LOG, writeBatch);
- }
-
- for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
- lock = writeLocks.getLock(relatedEntity);
- lock.lock();
- try {
- StartAndInsertTime relatedEntityStartAndInsertTime =
- getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
- readReverseOrderedLong(revStartTime, 0), null);
- if (relatedEntityStartAndInsertTime == null) {
- throw new IOException("Error setting start time for related entity");
- }
- byte[] relatedEntityStartTime = writeReverseOrderedLong(
- relatedEntityStartAndInsertTime.startTime);
- db.put(createRelatedEntityKey(relatedEntity.getId(),
- relatedEntity.getType(), relatedEntityStartTime,
- entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
- db.put(createEntityMarkerKey(relatedEntity.getId(),
- relatedEntity.getType(), relatedEntityStartTime),
- writeReverseOrderedLong(relatedEntityStartAndInsertTime
- .insertTime));
- } catch (IOException e) {
- LOG.error("Error putting related entity " + relatedEntity.getId() +
- " of type " + relatedEntity.getType() + " for entity " +
- entity.getEntityId() + " of type " + entity.getEntityType(), e);
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.IO_EXCEPTION);
- response.addError(error);
- } finally {
- lock.unlock();
- writeLocks.returnLock(lock);
- }
- }
- }
-
- /**
- * For a given key / value pair that has been written to the db,
- * write additional entries to the db for each primary filter.
- */
- private static void writePrimaryFilterEntries(WriteBatch writeBatch,
- Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
- throws IOException {
- if (primaryFilters != null && !primaryFilters.isEmpty()) {
- for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
- for (Object pfval : pf.getValue()) {
- writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
- key), value);
- }
- }
- }
- }
-
- @Override
- public TimelinePutResponse put(TimelineEntities entities) {
- try {
- deleteLock.readLock().lock();
- TimelinePutResponse response = new TimelinePutResponse();
- for (TimelineEntity entity : entities.getEntities()) {
- put(entity, response);
- }
- return response;
- } finally {
- deleteLock.readLock().unlock();
- }
- }
-
- /**
- * Get the unique start time for a given entity as a byte array that sorts
- * the timestamps in reverse order (see {@link
- * GenericObjectMapper#writeReverseOrderedLong(long)}).
- *
- * @param entityId The id of the entity
- * @param entityType The type of the entity
- * @return A byte array, null if not found
- * @throws IOException
- */
- private byte[] getStartTime(String entityId, String entityType)
- throws IOException {
- Long l = getStartTimeLong(entityId, entityType);
- return l == null ? null : writeReverseOrderedLong(l);
- }
-
- /**
- * Get the unique start time for a given entity as a Long.
- *
- * @param entityId The id of the entity
- * @param entityType The type of the entity
- * @return A Long, null if not found
- * @throws IOException
- */
- private Long getStartTimeLong(String entityId, String entityType)
- throws IOException {
- EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
- // start time is not provided, so try to look it up
- if (startTimeReadCache.containsKey(entity)) {
- // found the start time in the cache
- return startTimeReadCache.get(entity);
- } else {
- // try to look up the start time in the db
- byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- byte[] v = db.get(b);
- if (v == null) {
- // did not find the start time in the db
- return null;
- } else {
- // found the start time in the db
- Long l = readReverseOrderedLong(v, 0);
- startTimeReadCache.put(entity, l);
- return l;
- }
- }
- }
-
- /**
- * Get the unique start time for a given entity as a byte array that sorts
- * the timestamps in reverse order (see {@link
- * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
- * doesn't exist, set it based on the information provided. Should only be
- * called when a lock has been obtained on the entity.
- *
- * @param entityId The id of the entity
- * @param entityType The type of the entity
- * @param startTime The start time of the entity, or null
- * @param events A list of events for the entity, or null
- * @return A StartAndInsertTime
- * @throws IOException
- */
- private StartAndInsertTime getAndSetStartTime(String entityId,
- String entityType, Long startTime, List<TimelineEvent> events)
- throws IOException {
- EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
- if (startTime == null) {
- // start time is not provided, so try to look it up
- if (startTimeWriteCache.containsKey(entity)) {
- // found the start time in the cache
- return startTimeWriteCache.get(entity);
- } else {
- if (events != null) {
- // prepare a start time from events in case it is needed
- Long min = Long.MAX_VALUE;
- for (TimelineEvent e : events) {
- if (min > e.getTimestamp()) {
- min = e.getTimestamp();
- }
- }
- startTime = min;
- }
- return checkStartTimeInDb(entity, startTime);
- }
- } else {
- // start time is provided
- if (startTimeWriteCache.containsKey(entity)) {
- // always use start time from cache if it exists
- return startTimeWriteCache.get(entity);
- } else {
- // check the provided start time matches the db
- return checkStartTimeInDb(entity, startTime);
- }
- }
- }
-
- /**
- * Checks db for start time and returns it if it exists. If it doesn't
- * exist, writes the suggested start time (if it is not null). This is
- * only called when the start time is not found in the cache,
- * so it adds it back into the cache if it is found. Should only be called
- * when a lock has been obtained on the entity.
- */
- private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
- Long suggestedStartTime) throws IOException {
- StartAndInsertTime startAndInsertTime = null;
- // create lookup key for start time
- byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- // retrieve value for key
- byte[] v = db.get(b);
- if (v == null) {
- // start time doesn't exist in db
- if (suggestedStartTime == null) {
- return null;
- }
- startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
- System.currentTimeMillis());
-
- // write suggested start time
- v = new byte[16];
- writeReverseOrderedLong(suggestedStartTime, v, 0);
- writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
- WriteOptions writeOptions = new WriteOptions();
- writeOptions.sync(true);
- db.put(b, v, writeOptions);
- } else {
- // found start time in db, so ignore suggested start time
- startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
- readReverseOrderedLong(v, 8));
- }
- startTimeWriteCache.put(entity, startAndInsertTime);
- startTimeReadCache.put(entity, startAndInsertTime.startTime);
- return startAndInsertTime;
- }
-
- /**
- * Creates a key for looking up the start time of a given entity,
- * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
- */
- private static byte[] createStartTimeLookupKey(String entityId,
- String entityType) throws IOException {
- return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
- .add(entityType).add(entityId).getBytes();
- }
-
- /**
- * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
- * revstarttime + entity id.
- */
- private static byte[] createEntityMarkerKey(String entityId,
- String entityType, byte[] revStartTime) throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
- }
-
- /**
- * Creates an index entry for the given key of the form
- * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
- */
- private static byte[] addPrimaryFilterToKey(String primaryFilterName,
- Object primaryFilterValue, byte[] key) throws IOException {
- return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
- .add(primaryFilterName)
- .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
- .getBytes();
- }
-
- /**
- * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
- * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
- */
- private static byte[] createEntityEventKey(String entityId,
- String entityType, byte[] revStartTime, byte[] revEventTimestamp,
- String eventType) throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
- .add(revEventTimestamp).add(eventType).getBytes();
- }
-
- /**
- * Creates an event object from the given key, offset, and value. If the
- * event type is not contained in the specified set of event types,
- * returns null.
- */
- private static TimelineEvent getEntityEvent(Set<String> eventTypes,
- byte[] key, int offset, byte[] value) throws IOException {
- KeyParser kp = new KeyParser(key, offset);
- long ts = kp.getNextLong();
- String tstype = kp.getNextString();
- if (eventTypes == null || eventTypes.contains(tstype)) {
- TimelineEvent event = new TimelineEvent();
- event.setTimestamp(ts);
- event.setEventType(tstype);
- Object o = GenericObjectMapper.read(value);
- if (o == null) {
- event.setEventInfo(null);
- } else if (o instanceof Map) {
- @SuppressWarnings("unchecked")
- Map<String, Object> m = (Map<String, Object>) o;
- event.setEventInfo(m);
- } else {
- throw new IOException("Couldn't deserialize event info map");
- }
- return event;
- }
- return null;
- }
-
- /**
- * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
- * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
- * value.
- */
- private static byte[] createPrimaryFilterKey(String entityId,
- String entityType, byte[] revStartTime, String name, Object value)
- throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
- .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
- .add(GenericObjectMapper.write(value)).getBytes();
- }
-
- /**
- * Parses the primary filter from the given key at the given offset and
- * adds it to the given entity.
- */
- private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
- int offset) throws IOException {
- KeyParser kp = new KeyParser(key, offset);
- String name = kp.getNextString();
- Object value = GenericObjectMapper.read(key, kp.getOffset());
- entity.addPrimaryFilter(name, value);
- }
-
- /**
- * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
- * revstarttime + entity id + OTHER_INFO_COLUMN + name.
- */
- private static byte[] createOtherInfoKey(String entityId, String entityType,
- byte[] revStartTime, String name) throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
- .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
- .getBytes();
- }
-
- /**
- * Creates a string representation of the byte array from the given offset
- * to the end of the array (for parsing other info keys).
- */
- private static String parseRemainingKey(byte[] b, int offset) {
- return new String(b, offset, b.length - offset);
- }
-
- /**
- * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
- * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
- * relatedentity type + relatedentity id.
- */
- private static byte[] createRelatedEntityKey(String entityId,
- String entityType, byte[] revStartTime, String relatedEntityId,
- String relatedEntityType) throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
- .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
- .add(relatedEntityType).add(relatedEntityId).getBytes();
- }
-
- /**
- * Parses the related entity from the given key at the given offset and
- * adds it to the given entity.
- */
- private static void addRelatedEntity(TimelineEntity entity, byte[] key,
- int offset) throws IOException {
- KeyParser kp = new KeyParser(key, offset);
- String type = kp.getNextString();
- String id = kp.getNextString();
- entity.addRelatedEntity(type, id);
- }
-
- /**
- * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
- * entity type + revstarttime + entity id +
- * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
- * relatedentity type + relatedentity id.
- */
- private static byte[] createReverseRelatedEntityKey(String entityId,
- String entityType, byte[] revStartTime, String relatedEntityId,
- String relatedEntityType) throws IOException {
- return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
- .add(revStartTime).add(entityId)
- .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
- .add(relatedEntityType).add(relatedEntityId).getBytes();
- }
-
- /**
- * Clears the cache to test reloading start times from leveldb (only for
- * testing).
- */
- @VisibleForTesting
- void clearStartTimeCache() {
- startTimeWriteCache.clear();
- startTimeReadCache.clear();
- }
-
- @VisibleForTesting
- static int getStartTimeReadCacheSize(Configuration conf) {
- return conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
- YarnConfiguration.
- DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
- }
-
- @VisibleForTesting
- static int getStartTimeWriteCacheSize(Configuration conf) {
- return conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
- YarnConfiguration.
- DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
- }
-
- // warning is suppressed to prevent eclipse from noting unclosed resource
- @SuppressWarnings("resource")
- @VisibleForTesting
- List<String> getEntityTypes() throws IOException {
- DBIterator iterator = null;
- try {
- iterator = getDbIterator(false);
- List<String> entityTypes = new ArrayList<String>();
- iterator.seek(ENTITY_ENTRY_PREFIX);
- while (iterator.hasNext()) {
- byte[] key = iterator.peekNext().getKey();
- if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
- break;
- }
- KeyParser kp = new KeyParser(key,
- ENTITY_ENTRY_PREFIX.length);
- String entityType = kp.getNextString();
- entityTypes.add(entityType);
- byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType).getBytesForLookup();
- if (lookupKey[lookupKey.length - 1] != 0x0) {
- throw new IOException("Found unexpected end byte in lookup key");
- }
- lookupKey[lookupKey.length - 1] = 0x1;
- iterator.seek(lookupKey);
- }
- return entityTypes;
- } finally {
- IOUtils.cleanup(LOG, iterator);
- }
- }
-
- /**
- * Finds all keys in the db that have a given prefix and deletes them on
- * the given write batch.
- */
- private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
- DBIterator iterator) {
- for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
- byte[] key = iterator.peekNext().getKey();
- if (!prefixMatches(prefix, prefix.length, key)) {
- break;
- }
- writeBatch.delete(key);
- }
- }
-
- @VisibleForTesting
- boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
- DBIterator iterator, DBIterator pfIterator, boolean seeked)
- throws IOException {
- WriteBatch writeBatch = null;
- try {
- KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
- .add(entityType);
- byte[] typePrefix = kb.getBytesForLookup();
- kb.add(reverseTimestamp);
- if (!seeked) {
- iterator.seek(kb.getBytesForLookup());
- }
- if (!iterator.hasNext()) {
- return false;
- }
- byte[] entityKey = iterator.peekNext().getKey();
- if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
- return false;
- }
-
- // read the start time and entity id from the current key
- KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
- String entityId = kp.getNextString();
- int prefixlen = kp.getOffset();
- byte[] deletePrefix = new byte[prefixlen];
- System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
-
- writeBatch = db.createWriteBatch();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
- }
- // remove start time from cache and db
- writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
- EntityIdentifier entityIdentifier =
- new EntityIdentifier(entityId, entityType);
- startTimeReadCache.remove(entityIdentifier);
- startTimeWriteCache.remove(entityIdentifier);
-
- // delete current entity
- for (; iterator.hasNext(); iterator.next()) {
- byte[] key = iterator.peekNext().getKey();
- if (!prefixMatches(entityKey, prefixlen, key)) {
- break;
- }
- writeBatch.delete(key);
-
- if (key.length == prefixlen) {
- continue;
- }
- if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
- kp = new KeyParser(key,
- prefixlen + PRIMARY_FILTERS_COLUMN.length);
- String name = kp.getNextString();
- Object value = GenericObjectMapper.read(key, kp.getOffset());
- deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
- deletePrefix), pfIterator);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting entity type:" + entityType + " id:" +
- entityId + " primary filter entry " + name + " " +
- value);
- }
- } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
- kp = new KeyParser(key,
- prefixlen + RELATED_ENTITIES_COLUMN.length);
- String type = kp.getNextString();
- String id = kp.getNextString();
- byte[] relatedEntityStartTime = getStartTime(id, type);
- if (relatedEntityStartTime == null) {
- LOG.warn("Found no start time for " +
- "related entity " + id + " of type " + type + " while " +
- "deleting " + entityId + " of type " + entityType);
- continue;
- }
- writeBatch.delete(createReverseRelatedEntityKey(id, type,
- relatedEntityStartTime, entityId, entityType));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting entity type:" + entityType + " id:" +
- entityId + " from invisible reverse related entity " +
- "entry of type:" + type + " id:" + id);
- }
- } else if (key[prefixlen] ==
- INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
- kp = new KeyParser(key, prefixlen +
- INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
- String type = kp.getNextString();
- String id = kp.getNextString();
- byte[] relatedEntityStartTime = getStartTime(id, type);
- if (relatedEntityStartTime == null) {
- LOG.warn("Found no start time for reverse " +
- "related entity " + id + " of type " + type + " while " +
- "deleting " + entityId + " of type " + entityType);
- continue;
- }
- writeBatch.delete(createRelatedEntityKey(id, type,
- relatedEntityStartTime, entityId, entityType));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting entity type:" + entityType + " id:" +
- entityId + " from related entity entry of type:" +
- type + " id:" + id);
- }
- }
- }
- WriteOptions writeOptions = new WriteOptions();
- writeOptions.sync(true);
- db.write(writeBatch, writeOptions);
- return true;
- } finally {
- IOUtils.cleanup(LOG, writeBatch);
- }
- }
-
- /**
- * Discards entities with start timestamp less than or equal to the given
- * timestamp.
- */
- @VisibleForTesting
- void discardOldEntities(long timestamp)
- throws IOException, InterruptedException {
- byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
- long totalCount = 0;
- long t1 = System.currentTimeMillis();
- try {
- List<String> entityTypes = getEntityTypes();
- for (String entityType : entityTypes) {
- DBIterator iterator = null;
- DBIterator pfIterator = null;
- long typeCount = 0;
- try {
- deleteLock.writeLock().lock();
- iterator = getDbIterator(false);
- pfIterator = getDbIterator(false);
-
- if (deletionThread != null && deletionThread.isInterrupted()) {
- throw new InterruptedException();
- }
- boolean seeked = false;
- while (deleteNextEntity(entityType, reverseTimestamp, iterator,
- pfIterator, seeked)) {
- typeCount++;
- totalCount++;
- seeked = true;
- if (deletionThread != null && deletionThread.isInterrupted()) {
- throw new InterruptedException();
- }
- }
- } catch (IOException e) {
- LOG.error("Got IOException while deleting entities for type " +
- entityType + ", continuing to next type", e);
- } finally {
- IOUtils.cleanup(LOG, iterator, pfIterator);
- deleteLock.writeLock().unlock();
- if (typeCount > 0) {
- LOG.info("Deleted " + typeCount + " entities of type " +
- entityType);
- }
- }
- }
- } finally {
- long t2 = System.currentTimeMillis();
- LOG.info("Discarded " + totalCount + " entities for timestamp " +
- timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
- }
- }
-
- @VisibleForTesting
- DBIterator getDbIterator(boolean fillCache) {
- ReadOptions readOptions = new ReadOptions();
- readOptions.fillCache(fillCache);
- return db.iterator(readOptions);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
deleted file mode 100644
index 86ac1f8..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
-
-/**
- * In-memory implementation of {@link TimelineStore}. This
- * implementation is for test purpose only. If users improperly instantiate it,
- * they may encounter reading and writing history data in different memory
- * store.
- *
- */
-@Private
-@Unstable
-public class MemoryTimelineStore
- extends AbstractService implements TimelineStore {
-
- private Map<EntityIdentifier, TimelineEntity> entities =
- new HashMap<EntityIdentifier, TimelineEntity>();
- private Map<EntityIdentifier, Long> entityInsertTimes =
- new HashMap<EntityIdentifier, Long>();
-
- public MemoryTimelineStore() {
- super(MemoryTimelineStore.class.getName());
- }
-
- @Override
- public TimelineEntities getEntities(String entityType, Long limit,
- Long windowStart, Long windowEnd, String fromId, Long fromTs,
- NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
- EnumSet<Field> fields) {
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- if (windowStart == null) {
- windowStart = Long.MIN_VALUE;
- }
- if (windowEnd == null) {
- windowEnd = Long.MAX_VALUE;
- }
- if (fields == null) {
- fields = EnumSet.allOf(Field.class);
- }
-
- Iterator<TimelineEntity> entityIterator = null;
- if (fromId != null) {
- TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
- entityType));
- if (firstEntity == null) {
- return new TimelineEntities();
- } else {
- entityIterator = new TreeSet<TimelineEntity>(entities.values())
- .tailSet(firstEntity, true).iterator();
- }
- }
- if (entityIterator == null) {
- entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
- .iterator();
- }
-
- List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
- while (entityIterator.hasNext()) {
- TimelineEntity entity = entityIterator.next();
- if (entitiesSelected.size() >= limit) {
- break;
- }
- if (!entity.getEntityType().equals(entityType)) {
- continue;
- }
- if (entity.getStartTime() <= windowStart) {
- continue;
- }
- if (entity.getStartTime() > windowEnd) {
- continue;
- }
- if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
- entity.getEntityId(), entity.getEntityType())) > fromTs) {
- continue;
- }
- if (primaryFilter != null &&
- !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
- continue;
- }
- if (secondaryFilters != null) { // AND logic
- boolean flag = true;
- for (NameValuePair secondaryFilter : secondaryFilters) {
- if (secondaryFilter != null && !matchPrimaryFilter(
- entity.getPrimaryFilters(), secondaryFilter) &&
- !matchFilter(entity.getOtherInfo(), secondaryFilter)) {
- flag = false;
- break;
- }
- }
- if (!flag) {
- continue;
- }
- }
- entitiesSelected.add(entity);
- }
- List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
- for (TimelineEntity entitySelected : entitiesSelected) {
- entitiesToReturn.add(maskFields(entitySelected, fields));
- }
- Collections.sort(entitiesToReturn);
- TimelineEntities entitiesWrapper = new TimelineEntities();
- entitiesWrapper.setEntities(entitiesToReturn);
- return entitiesWrapper;
- }
-
- @Override
- public TimelineEntity getEntity(String entityId, String entityType,
- EnumSet<Field> fieldsToRetrieve) {
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.allOf(Field.class);
- }
- TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
- if (entity == null) {
- return null;
- } else {
- return maskFields(entity, fieldsToRetrieve);
- }
- }
-
- @Override
- public TimelineEvents getEntityTimelines(String entityType,
- SortedSet<String> entityIds, Long limit, Long windowStart,
- Long windowEnd,
- Set<String> eventTypes) {
- TimelineEvents allEvents = new TimelineEvents();
- if (entityIds == null) {
- return allEvents;
- }
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- if (windowStart == null) {
- windowStart = Long.MIN_VALUE;
- }
- if (windowEnd == null) {
- windowEnd = Long.MAX_VALUE;
- }
- for (String entityId : entityIds) {
- EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
- TimelineEntity entity = entities.get(entityID);
- if (entity == null) {
- continue;
- }
- EventsOfOneEntity events = new EventsOfOneEntity();
- events.setEntityId(entityId);
- events.setEntityType(entityType);
- for (TimelineEvent event : entity.getEvents()) {
- if (events.getEvents().size() >= limit) {
- break;
- }
- if (event.getTimestamp() <= windowStart) {
- continue;
- }
- if (event.getTimestamp() > windowEnd) {
- continue;
- }
- if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
- continue;
- }
- events.addEvent(event);
- }
- allEvents.addEvent(events);
- }
- return allEvents;
- }
-
- @Override
- public TimelinePutResponse put(TimelineEntities data) {
- TimelinePutResponse response = new TimelinePutResponse();
- for (TimelineEntity entity : data.getEntities()) {
- EntityIdentifier entityId =
- new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
- // store entity info in memory
- TimelineEntity existingEntity = entities.get(entityId);
- if (existingEntity == null) {
- existingEntity = new TimelineEntity();
- existingEntity.setEntityId(entity.getEntityId());
- existingEntity.setEntityType(entity.getEntityType());
- existingEntity.setStartTime(entity.getStartTime());
- entities.put(entityId, existingEntity);
- entityInsertTimes.put(entityId, System.currentTimeMillis());
- }
- if (entity.getEvents() != null) {
- if (existingEntity.getEvents() == null) {
- existingEntity.setEvents(entity.getEvents());
- } else {
- existingEntity.addEvents(entity.getEvents());
- }
- Collections.sort(existingEntity.getEvents());
- }
- // check startTime
- if (existingEntity.getStartTime() == null) {
- if (existingEntity.getEvents() == null
- || existingEntity.getEvents().isEmpty()) {
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entityId.getId());
- error.setEntityType(entityId.getType());
- error.setErrorCode(TimelinePutError.NO_START_TIME);
- response.addError(error);
- entities.remove(entityId);
- entityInsertTimes.remove(entityId);
- continue;
- } else {
- Long min = Long.MAX_VALUE;
- for (TimelineEvent e : entity.getEvents()) {
- if (min > e.getTimestamp()) {
- min = e.getTimestamp();
- }
- }
- existingEntity.setStartTime(min);
- }
- }
- if (entity.getPrimaryFilters() != null) {
- if (existingEntity.getPrimaryFilters() == null) {
- existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
- }
- for (Entry<String, Set<Object>> pf :
- entity.getPrimaryFilters().entrySet()) {
- for (Object pfo : pf.getValue()) {
- existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
- }
- }
- }
- if (entity.getOtherInfo() != null) {
- if (existingEntity.getOtherInfo() == null) {
- existingEntity.setOtherInfo(new HashMap<String, Object>());
- }
- for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
- existingEntity.addOtherInfo(info.getKey(),
- maybeConvert(info.getValue()));
- }
- }
- // relate it to other entities
- if (entity.getRelatedEntities() == null) {
- continue;
- }
- for (Map.Entry<String, Set<String>> partRelatedEntities : entity
- .getRelatedEntities().entrySet()) {
- if (partRelatedEntities == null) {
- continue;
- }
- for (String idStr : partRelatedEntities.getValue()) {
- EntityIdentifier relatedEntityId =
- new EntityIdentifier(idStr, partRelatedEntities.getKey());
- TimelineEntity relatedEntity = entities.get(relatedEntityId);
- if (relatedEntity != null) {
- relatedEntity.addRelatedEntity(
- existingEntity.getEntityType(), existingEntity.getEntityId());
- } else {
- relatedEntity = new TimelineEntity();
- relatedEntity.setEntityId(relatedEntityId.getId());
- relatedEntity.setEntityType(relatedEntityId.getType());
- relatedEntity.setStartTime(existingEntity.getStartTime());
- relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
- existingEntity.getEntityId());
- entities.put(relatedEntityId, relatedEntity);
- entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
- }
- }
- }
- }
- return response;
- }
-
- private static TimelineEntity maskFields(
- TimelineEntity entity, EnumSet<Field> fields) {
- // Conceal the fields that are not going to be exposed
- TimelineEntity entityToReturn = new TimelineEntity();
- entityToReturn.setEntityId(entity.getEntityId());
- entityToReturn.setEntityType(entity.getEntityType());
- entityToReturn.setStartTime(entity.getStartTime());
- entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
- entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
- Arrays.asList(entity.getEvents().get(0)) : null);
- entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
- entity.getRelatedEntities() : null);
- entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
- entity.getPrimaryFilters() : null);
- entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
- entity.getOtherInfo() : null);
- return entityToReturn;
- }
-
- private static boolean matchFilter(Map<String, Object> tags,
- NameValuePair filter) {
- Object value = tags.get(filter.getName());
- if (value == null) { // doesn't have the filter
- return false;
- } else if (!value.equals(filter.getValue())) { // doesn't match the filter
- return false;
- }
- return true;
- }
-
- private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
- NameValuePair filter) {
- Set<Object> value = tags.get(filter.getName());
- if (value == null) { // doesn't have the filter
- return false;
- } else {
- return value.contains(filter.getValue());
- }
- }
-
- private static Object maybeConvert(Object o) {
- if (o instanceof Long) {
- Long l = (Long)o;
- if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
- return l.intValue();
- }
- }
- return o;
- }
-
-}