You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:30 UTC
[13/50] [abbrv] hadoop git commit: YARN-3901. Populate flow run data
in the flow_run & flow activity tables (Vrushali C via sjlee)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
new file mode 100644
index 0000000..af8df99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow activity table has column family info
+ * Stores the daily activity record for flows
+ * Useful as a quick lookup of what flows were
+ * running on a given day
+ *
+ * Example flow activity table record:
+ *
+ * </pre>
+ * |-------------------------------------------|
+ * | Row key | Column Family |
+ * | | info |
+ * |-------------------------------------------|
+ * | clusterId! | r!runid1:version1 |
+ * | inv Top of | |
+ * | Day! | r!runid2:version7 |
+ * | userName! | |
+ * | flowId | |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowActivityTable extends BaseTable<FlowActivityTable> {
+ /** flow activity table prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+ /** config param name that specifies the flowactivity table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowactivity table name */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity";
+
+ private static final Log LOG = LogFactory.getLog(FlowActivityTable.class);
+
+ /** default max number of versions */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowActivityTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ FlowActivityTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy before running in production
+ admin.createTable(FlowActivityTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
new file mode 100644
index 0000000..ad30add
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumn implements Column<FlowRunTable> {
+
+ /**
+ * When the flow was started. This is the minimum of currently known
+ * application start times.
+ */
+ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
+ AggregationOperation.MIN),
+
+ /**
+ * When the flow ended. This is the maximum of currently known application end
+ * times.
+ */
+ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
+ AggregationOperation.MAX),
+
+ /**
+ * The version of the flow that this flow belongs to.
+ */
+ FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null);
+
+ private final ColumnHelper<FlowRunTable> column;
+ private final ColumnFamily<FlowRunTable> columnFamily;
+ private final String columnQualifier;
+ private final byte[] columnQualifierBytes;
+ private final AggregationOperation aggOp;
+
+ private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+ String columnQualifier, AggregationOperation aggOp) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ this.aggOp = aggOp;
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnQualifier));
+ this.column = new ColumnHelper<FlowRunTable>(columnFamily);
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public byte[] getColumnQualifierBytes() {
+ return columnQualifierBytes.clone();
+ }
+
+ public AggregationOperation getAggregationOperation() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store
+ * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
+ Object inputValue, Attribute... attributes) throws IOException {
+
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, aggOp);
+ column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+ inputValue, combinedAttributes);
+ }
+
+ public Object readResult(Result result) throws IOException {
+ return column.readResult(result, columnQualifierBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnQualifier
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumn} or null
+ */
+ public static final FlowRunColumn columnFor(String columnQualifier) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowRunColumn ec : FlowRunColumn.values()) {
+ // Find a match based only on name.
+ if (ec.getColumnQualifier().equals(columnQualifier)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+ * if and only if {@code a.equals(b) & x.equals(y)} or
+ * {@code (x == y == null)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param name
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumn} or null if both arguments
+ * don't match.
+ */
+ public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily,
+ String name) {
+
+ for (FlowRunColumn ec : FlowRunColumn.values()) {
+ // Find a match based column family and on name.
+ if (ec.columnFamily.equals(columnFamily)
+ && ec.getColumnQualifier().equals(name)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
new file mode 100644
index 0000000..8faf5f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value
+ * create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private FlowRunColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
new file mode 100644
index 0000000..d55f510
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+
+ /**
+ * To store flow run info values.
+ */
+ METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM);
+
+ private final ColumnHelper<FlowRunTable> column;
+ private final ColumnFamily<FlowRunTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ private final AggregationOperation aggOp;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily
+ * that this column is stored in.
+ * @param columnPrefix
+ * for this column.
+ */
+ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+ String columnPrefix, AggregationOperation fra) {
+ column = new ColumnHelper<FlowRunTable>(columnFamily);
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnPrefix));
+ }
+ this.aggOp = fra;
+ }
+
+ /**
+ * @return the column name value
+ */
+ public String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ public byte[] getColumnPrefixBytes() {
+ return columnPrefixBytes.clone();
+ }
+
+ public AggregationOperation getAttribute() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ */
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+ */
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public Map<String, Object> readResults(Result result) throws IOException {
+ return column.readResults(result, columnPrefixBytes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ */
+ public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+ Result result) throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+ * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumnPrefix} or null
+ */
+ public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+ // Find a match based only on name.
+ if (frcp.getColumnPrefix().equals(columnPrefix)) {
+ return frcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+ * no match. The following holds true:
+ * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+ * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowRunColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final FlowRunColumnPrefix columnFor(
+ FlowRunColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+ // Find a match based column family and on name.
+ if (frcp.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp
+ .getColumnPrefix().equals(columnPrefix)))) {
+ return frcp;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
new file mode 100644
index 0000000..f743e5e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+public class FlowRunCoprocessor extends BaseRegionObserver {
+
+ @SuppressWarnings("unused")
+ private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+
+ private HRegion region;
+ /**
+ * generate a timestamp that is unique per row in a region this is per region
+ */
+ private final TimestampGenerator timestampGenerator = new TimestampGenerator();
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ if (e instanceof RegionCoprocessorEnvironment) {
+ RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+ this.region = env.getRegion();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * This method adds the tags onto the cells in the Put. It is presumed that
+ * all the cells in one Put have the same set of Tags. The existing cell
+ * timestamp is overwritten for non-metric cells and each such cell gets a new
+ * unique timestamp generated by {@link TimestampGenerator}
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
+ * .hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Put,
+ * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+ * org.apache.hadoop.hbase.client.Durability)
+ */
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
+ WALEdit edit, Durability durability) throws IOException {
+ Map<String, byte[]> attributes = put.getAttributesMap();
+
+ // Assumption is that all the cells in a put are the same operation.
+ List<Tag> tags = new ArrayList<>();
+ if ((attributes != null) && (attributes.size() > 0)) {
+ for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
+ Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+ tags.add(t);
+ }
+ byte[] tagByteArray = Tag.fromList(tags);
+ NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
+ Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
+ .entrySet()) {
+ List<Cell> newCells = new ArrayList<>(entry.getValue().size());
+ for (Cell cell : entry.getValue()) {
+ // for each cell in the put add the tags
+ // Assumption is that all the cells in
+ // one put are the same operation
+ // also, get a unique cell timestamp for non-metric cells
+ // this way we don't inadvertently overwrite cell versions
+ long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
+ newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
+ CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+ cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
+ tagByteArray));
+ }
+ newFamilyMap.put(entry.getKey(), newCells);
+ } // for each entry
+ // Update the family map for the Put
+ put.setFamilyCellMap(newFamilyMap);
+ }
+ }
+
+ /**
+ * Determines if the current cell's timestamp is to be used or a new unique
+ * cell timestamp is to be used. The reason this is done is to inadvertently
+ * overwrite cells when writes come in very fast. But for metric cells, the
+ * cell timestamp signifies the metric timestamp. Hence we don't want to
+ * overwrite it.
+ *
+ * @param timestamp
+ * @param tags
+ * @return cell timestamp
+ */
+ private long getCellTimestamp(long timestamp, List<Tag> tags) {
+ // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
+ // then use the generator
+ if (timestamp == HConstants.LATEST_TIMESTAMP) {
+ return timestampGenerator.getUniqueTimestamp();
+ } else {
+ return timestamp;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Creates a {@link FlowScanner} Scan so that it can correctly process the
+ * contents of {@link FlowRunTable}.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
+ * .hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Get, java.util.List)
+ */
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
+ Get get, List<Cell> results) throws IOException {
+ Scan scan = new Scan(get);
+ scan.setMaxVersions();
+ RegionScanner scanner = null;
+ try {
+ scanner = new FlowScanner(region, scan.getBatch(),
+ region.getScanner(scan));
+ scanner.next(results);
+ e.bypass();
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Ensures that max versions are set for the Scan so that metrics can be
+ * correctly aggregated and min/max can be correctly determined.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
+ * .apache.hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Scan,
+ * org.apache.hadoop.hbase.regionserver.RegionScanner)
+ */
+ @Override
+ public RegionScanner preScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+ RegionScanner s) throws IOException {
+ // set max versions for scan to see all
+ // versions to aggregate for metrics
+ scan.setMaxVersions();
+ return s;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * Creates a {@link FlowScanner} Scan so that it can correctly process the
+ * contents of {@link FlowRunTable}.
+ *
+ * @see
+ * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
+ * org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ * org.apache.hadoop.hbase.client.Scan,
+ * org.apache.hadoop.hbase.regionserver.RegionScanner)
+ */
+ @Override
+ public RegionScanner postScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+ RegionScanner scanner) throws IOException {
+ return new FlowScanner(region, scan.getBatch(), scanner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
new file mode 100644
index 0000000..e133241
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow run table.
+ */
+public class FlowRunRowKey {
+ // TODO: more methods are needed for this class like parse row key
+
+ /**
+ * Constructs a row key for the entity table as follows: {
+ * clusterId!userI!flowId!Inverted Flow Run Id}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @return byte array with the row key
+ */
+ public static byte[] getRowKey(String clusterId, String userId,
+ String flowId, Long flowRunId) {
+ byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
+ userId, flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ return Separator.QUALIFIERS.join(first, second);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
new file mode 100644
index 0000000..b1b93c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow run table has column family info
+ * Stores per flow run information
+ * aggregated across applications.
+ *
+ * Metrics are also stored in the info column family.
+ *
+ * Example flow run table record:
+ *
+ * <pre>
+ * flow_run table
+ * |-------------------------------------------|
+ * | Row key | Column Family |
+ * | | info |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7 |
+ * | userName! | |
+ * | flowId! | running_apps:1 |
+ * | flowRunId | |
+ * | | min_start_time:1392995080000 |
+ * | | #0:"" |
+ * | | |
+ * | | min_start_time:1392995081012 |
+ * | | #0:appId2 |
+ * | | |
+ * | | min_start_time:1392993083210 |
+ * | | #0:appId3 |
+ * | | |
+ * | | |
+ * | | max_end_time:1392993084018 |
+ * | | #0:"" |
+ * | | |
+ * | | |
+ * | | m!mapInputRecords:127 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapInputRecords:31 |
+ * | | #2:appId2 |
+ * | | |
+ * | | m!mapInputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * | | m!mapOutputRecords:181 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapOutputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowRunTable extends BaseTable<FlowRunTable> {
+ /** entity prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+ /** config param name that specifies the flowrun table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowrun table name */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+ private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
+
+ /** default max number of versions */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowRunTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ flowRunTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy
+ flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
+ .getCanonicalName());
+ admin.createTable(flowRunTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..a1948aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+ private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+
+ private final HRegion region;
+ private final InternalScanner flowRunScanner;
+ private RegionScanner regionScanner;
+ private final int limit;
+ private boolean hasMore;
+ private byte[] currentRow;
+ private List<Cell> availableCells = new ArrayList<>();
+ private int currentIndex;
+
+ FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
+ this.region = region;
+ this.limit = limit;
+ this.flowRunScanner = internalScanner;
+ if (internalScanner instanceof RegionScanner) {
+ this.regionScanner = (RegionScanner) internalScanner;
+ }
+ // TODO: note if it's compaction/flush
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+ */
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> cells) throws IOException {
+ return nextRaw(cells, limit);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ @Override
+ public boolean next(List<Cell> cells) throws IOException {
+ return next(cells, limit);
+ }
+
+ @Override
+ public boolean next(List<Cell> cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ private String getAggregationCompactionDimension(List<Tag> tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * This method loops through the cells in a given row of the
+ * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+ * to process the contents. It then calculates the sum or min or max for each
+ * column or returns the cell as is.
+ *
+ * @param cells
+ * @param limit
+ * @return true if next row is available for the scanner, false otherwise
+ * @throws IOException
+ */
+ private boolean nextInternal(List<Cell> cells, int limit) throws IOException {
+ Cell cell = null;
+ startNext();
+ // Loop through all the cells in this row
+ // For min/max/metrics we do need to scan the entire set of cells to get the
+ // right one
+ // But with flush/compaction, the number of cells being scanned will go down
+ // cells are grouped per column qualifier then sorted by cell timestamp
+ // (latest to oldest) per column qualifier
+ // So all cells in one qualifier come one after the other before we see the
+ // next column qualifier
+ ByteArrayComparator comp = new ByteArrayComparator();
+ byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+ AggregationOperation currentAggOp = null;
+ SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+ Set<String> alreadySeenAggDim = new HashSet<>();
+ int addedCnt = 0;
+ while (((cell = peekAtNextCell(limit)) != null)
+ && (limit <= 0 || addedCnt < limit)) {
+ byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
+ if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
+ resetState(currentColumnCells, alreadySeenAggDim);
+ currentColumnQualifier = newColumnQualifier;
+ currentAggOp = getCurrentAggOp(cell);
+ }
+ collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
+ nextCell(limit);
+ }
+ if (!currentColumnCells.isEmpty()) {
+ emitCells(cells, currentColumnCells, currentAggOp);
+ }
+ return hasMore();
+ }
+
+ private AggregationOperation getCurrentAggOp(Cell cell) {
+ List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ // We assume that all the operations for a particular column are the same
+ return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+ }
+
+ /**
+ * resets the parameters to an intialized state for next loop iteration
+ *
+ * @param cell
+ * @param currentAggOp
+ * @param currentColumnCells
+ * @param alreadySeenAggDim
+ * @param collectedButNotEmitted
+ */
+ private void resetState(SortedSet<Cell> currentColumnCells,
+ Set<String> alreadySeenAggDim) {
+ currentColumnCells.clear();
+ alreadySeenAggDim.clear();
+ }
+
+ private void collectCells(SortedSet<Cell> currentColumnCells,
+ AggregationOperation currentAggOp, Cell cell,
+ Set<String> alreadySeenAggDim) throws IOException {
+ if (currentAggOp == null) {
+ // not a min/max/metric cell, so just return it as is
+ currentColumnCells.add(cell);
+ nextCell(limit);
+ return;
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMinCell = currentColumnCells.first();
+ Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
+ if (!currentMinCell.equals(newMinCell)) {
+ currentColumnCells.remove(currentMinCell);
+ currentColumnCells.add(newMinCell);
+ }
+ }
+ break;
+ case MAX:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMaxCell = currentColumnCells.first();
+ Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
+ if (!currentMaxCell.equals(newMaxCell)) {
+ currentColumnCells.remove(currentMaxCell);
+ currentColumnCells.add(newMaxCell);
+ }
+ }
+ break;
+ case SUM:
+ case SUM_FINAL:
+ // only if this app has not been seen yet, add to current column cells
+ List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ String aggDim = getAggregationCompactionDimension(tags);
+ if (alreadySeenAggDim.contains(aggDim)) {
+ // if this agg dimension has already been seen,
+ // since they show up in sorted order
+ // we drop the rest which are older
+ // in other words, this cell is older than previously seen cells
+ // for that agg dim
+ } else {
+ // not seen this agg dim, hence consider this cell in our working set
+ currentColumnCells.add(cell);
+ alreadySeenAggDim.add(aggDim);
+ }
+ break;
+ default:
+ break;
+ } // end of switch case
+ }
+
+ /*
+ * Processes the cells in input param currentColumnCells and populates
+ * List<Cell> cells as the output based on the input AggregationOperation
+ * parameter.
+ */
+ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
+ AggregationOperation currentAggOp) throws IOException {
+ if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+ return 0;
+ }
+ if (currentAggOp == null) {
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ case MAX:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ case SUM:
+ case SUM_FINAL:
+ Cell sumCell = processSummation(currentColumnCells);
+ cells.add(sumCell);
+ return 1;
+ default:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+ }
+
+ /*
+ * Returns a cell whose value is the sum of all cell values in the input set.
+ * The new cell created has the timestamp of the most recent metric cell. The
+ * sum of a metric for a flow run is the summation at the point of the last
+ * metric update in that flow till that time.
+ */
+ private Cell processSummation(SortedSet<Cell> currentColumnCells)
+ throws IOException {
+ Number sum = 0;
+ Number currentValue = 0;
+ long ts = 0L;
+ long mostCurrentTimestamp = 0l;
+ Cell mostRecentCell = null;
+ for (Cell cell : currentColumnCells) {
+ currentValue = (Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(cell));
+ ts = cell.getTimestamp();
+ if (mostCurrentTimestamp < ts) {
+ mostCurrentTimestamp = ts;
+ mostRecentCell = cell;
+ }
+ sum = sum.longValue() + currentValue.longValue();
+ }
+ Cell sumCell = createNewCell(mostRecentCell, sum);
+ return sumCell;
+ }
+
+ /**
+ * Determines which cell is to be returned based on the values in each cell
+ * and the comparison operation MIN or MAX.
+ *
+ * @param previouslyChosenCell
+ * @param currentCell
+ * @param currentAggOp
+ * @return the cell which is the min (or max) cell
+ * @throws IOException
+ */
+ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+ AggregationOperation currentAggOp) throws IOException {
+ if (previouslyChosenCell == null) {
+ return currentCell;
+ }
+ try {
+ long previouslyChosenCellValue = ((Number) GenericObjectMapper
+ .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
+ long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(currentCell))).longValue();
+ switch (currentAggOp) {
+ case MIN:
+ if (currentCellValue < previouslyChosenCellValue) {
+ // new value is minimum, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is miniumum, hence return previous min cell
+ return previouslyChosenCell;
+ }
+ case MAX:
+ if (currentCellValue > previouslyChosenCellValue) {
+ // new value is max, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is max, hence return previous max cell
+ return previouslyChosenCell;
+ }
+ default:
+ return currentCell;
+ }
+ } catch (IllegalArgumentException iae) {
+ LOG.error("caught iae during conversion to long ", iae);
+ return currentCell;
+ }
+ }
+
+ private Cell createNewCell(Cell origCell, Number number) throws IOException {
+ byte[] newValue = GenericObjectMapper.write(number);
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flowRunScanner.close();
+ }
+
+ /**
+ * Called to signal the start of the next() call by the scanner.
+ */
+ public void startNext() {
+ currentRow = null;
+ }
+
+ /**
+ * Returns whether or not the underlying scanner has more rows.
+ */
+ public boolean hasMore() {
+ return currentIndex < availableCells.size() ? true : hasMore;
+ }
+
+ /**
+ * Returns the next available cell for the current row and advances the
+ * pointer to the next cell. This method can be called multiple times in a row
+ * to advance through all the available cells.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell nextCell(int limit) throws IOException {
+ Cell cell = peekAtNextCell(limit);
+ if (cell != null) {
+ currentIndex++;
+ }
+ return cell;
+ }
+
+ /**
+ * Returns the next available cell for the current row, without advancing the
+ * pointer. Calling this method multiple times in a row will continue to
+ * return the same cell.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell peekAtNextCell(int limit) throws IOException {
+ if (currentIndex >= availableCells.size()) {
+ // done with current batch
+ availableCells.clear();
+ currentIndex = 0;
+ hasMore = flowRunScanner.next(availableCells, limit);
+ }
+ Cell cell = null;
+ if (currentIndex < availableCells.size()) {
+ cell = availableCells.get(currentIndex);
+ if (currentRow == null) {
+ currentRow = CellUtil.cloneRow(cell);
+ } else if (!CellUtil.matchingRow(cell, currentRow)) {
+ // moved on to the next row
+ // don't use the current cell
+ // also signal no more cells for this row
+ return null;
+ }
+ }
+ return cell;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+ */
+ @Override
+ public long getMaxResultSize() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's scanner is not a RegionScanner");
+ }
+ return regionScanner.getMaxResultSize();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+ */
+ @Override
+ public long getMvccReadPoint() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.getMvccReadPoint();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+ */
+ @Override
+ public boolean isFilterDone() throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.isFilterDone();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+ */
+ @Override
+ public boolean reseek(byte[] bytes) throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.reseek() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.reseek(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 2875e01..3962341 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -88,20 +87,15 @@ public class TestHBaseTimelineStorage {
}
private static void createSchema() throws IOException {
- new EntityTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new AppToFlowTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new ApplicationTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void testWriteApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities();
ApplicationEntity entity = new ApplicationEntity();
- String id = "hello";
- entity.setId(id);
+ String appId = "application_1000178881110_2002";
+ entity.setId(appId);
long cTime = 1425016501000L;
long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
@@ -173,12 +167,12 @@ public class TestHBaseTimelineStorage {
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
- hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+ hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
hbi.stop();
// retrieve the row
byte[] rowKey =
- ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+ ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -190,11 +184,11 @@ public class TestHBaseTimelineStorage {
// check the row key
byte[] row1 = result.getRow();
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
- id));
+ appId));
// check info column family
String id1 = ApplicationColumn.ID.readResult(result).toString();
- assertEquals(id, id1);
+ assertEquals(appId, id1);
Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
@@ -252,17 +246,17 @@ public class TestHBaseTimelineStorage {
assertEquals(metricValues, metricMap);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
- id, entity.getType(), null, null, null, null, null, null, null,
+ appId, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
// verify attributes
- assertEquals(id, e1.getId());
+ assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
@@ -576,7 +570,7 @@ public class TestHBaseTimelineStorage {
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
- String appName = "some app name";
+ String appName = "application_123465899910_1001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
new file mode 100644
index 0000000..f8331fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+
+/**
+ * Generates the data/entities for the FlowRun and FlowActivity Tables
+ */
+class TestFlowDataGenerator {
+
+ private final static String metric1 = "MAP_SLOT_MILLIS";
+ private final static String metric2 = "HDFS_BYTES_READ";
+
+
+ static TimelineEntity getEntityMetricsApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 2);
+ metricValues.put(ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap<Long, Number>();
+ ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 31);
+ metricValues.put(ts - 80000, 57);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMetricsApp2() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 5L);
+ metricValues.put(ts - 80000, 101L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntity1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHello";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 20000000000000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(1436512801000L);
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+ static TimelineEntity getEntityGreaterStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setCreatedTime(30000000000000L);
+ entity.setId("flowRunHello with greater start time");
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setType(type);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ long endTs = 1439379885000L;
+ event.setTimestamp(endTs);
+ String expKey = "foo_event_greater";
+ String expVal = "test_app_greater";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMaxEndTime(long endTs) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setId("flowRunHello Max End time");
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(endTs);
+ String expKey = "foo_even_max_ finished";
+ String expVal = "test_app_max_finished";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMinStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloMInStartTime";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 10000000000000L;
+ entity.setCreatedTime(cTime);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(System.currentTimeMillis());
+ entity.addEvent(event);
+ return entity;
+ }
+
+
+ static TimelineEntity getFlowApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowActivity_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+}