You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:52 UTC
[35/50] [abbrv] hadoop git commit: YARN-3862. Support for fetching
specific configs and metrics based on prefixes (Varun Saxena via sjlee)
YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76bc71cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76bc71cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76bc71cc
Branch: refs/heads/feature-YARN-2928
Commit: 76bc71cc7abe776420223a78f2d885800410ddc6
Parents: 54a529d
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Dec 1 21:47:43 2015 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:59:39 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reader/TimelineReaderManager.java | 4 +-
.../reader/filter/TimelineCompareFilter.java | 61 ++
.../reader/filter/TimelineCompareOp.java | 36 +
.../reader/filter/TimelineFilter.java | 56 ++
.../reader/filter/TimelineFilterList.java | 91 +++
.../reader/filter/TimelineFilterUtils.java | 120 ++++
.../reader/filter/TimelinePrefixFilter.java | 56 ++
.../reader/filter/package-info.java | 28 +
.../storage/ApplicationEntityReader.java | 123 +++-
.../storage/FileSystemTimelineReaderImpl.java | 9 +-
.../storage/FlowActivityEntityReader.java | 16 +-
.../storage/FlowRunEntityReader.java | 69 +-
.../storage/GenericEntityReader.java | 119 +++-
.../storage/HBaseTimelineReaderImpl.java | 11 +-
.../storage/TimelineEntityReader.java | 32 +-
.../storage/TimelineEntityReaderFactory.java | 23 +-
.../timelineservice/storage/TimelineReader.java | 32 +
.../application/ApplicationColumnPrefix.java | 18 +-
.../storage/common/ColumnPrefix.java | 29 +-
.../storage/entity/EntityColumnPrefix.java | 18 +-
.../storage/flow/FlowActivityColumnPrefix.java | 18 +-
.../storage/flow/FlowRunColumnPrefix.java | 18 +-
.../TestFileSystemTimelineReaderImpl.java | 42 +-
.../storage/TestHBaseTimelineStorage.java | 682 +++++++++++++++++--
.../flow/TestHBaseStorageFlowActivity.java | 6 +-
.../storage/flow/TestHBaseStorageFlowRun.java | 190 +++++-
27 files changed, 1761 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7636317..78705e1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4053. Change the way metric values are stored in HBase Storage (Varun
Saxena via sjlee)
+ YARN-3862. Support for fetching specific configs and metrics based on
+ prefixes (Varun Saxena via sjlee)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
index 27a50d5..294b05b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
@@ -77,7 +77,7 @@ public class TimelineReaderManager extends AbstractService {
return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
- metricFilters, eventFilters, fieldsToRetrieve);
+ metricFilters, eventFilters, null, null, fieldsToRetrieve);
}
/**
@@ -91,6 +91,6 @@ public class TimelineReaderManager extends AbstractService {
String entityId, EnumSet<Field> fields) throws IOException {
String cluster = getClusterID(clusterId, getConfig());
return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
- entityType, entityId, fields);
+ entityType, entityId, null, null, fields);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
new file mode 100644
index 0000000..14e7124
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on key-value pair
+ * and the relation between them represented by different relational operators.
+ */
+@Private
+@Unstable
+public class TimelineCompareFilter extends TimelineFilter {
+
+ private TimelineCompareOp compareOp;
+ private String key;
+ private Object value;
+
+ public TimelineCompareFilter() {
+ }
+
+ public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) {
+ this.compareOp = op;
+ this.key = key;
+ this.value = val;
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.COMPARE;
+ }
+
+ public TimelineCompareOp getCompareOp() {
+ return compareOp;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
new file mode 100644
index 0000000..461a7d8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Comparison Operators.
+ */
+@Private
+@Unstable
+public enum TimelineCompareOp {
+ LESS_THAN,
+ LESS_OR_EQUAL,
+ EQUAL,
+ NOT_EQUAL,
+ GREATER_OR_EQUAL,
+ GREATER_THAN
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
new file mode 100644
index 0000000..d4b4045
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Abstract base class extended to implement timeline filters.
+ */
+@Private
+@Unstable
+public abstract class TimelineFilter {
+
+ /**
+ * Lists the different filter types.
+ */
+ @Private
+ @Unstable
+ public enum TimelineFilterType {
+ /**
+ * Combines multiple filters.
+ */
+ LIST,
+ /**
+ * Filter which is used for comparison.
+ */
+ COMPARE,
+ /**
+ * Filter which matches prefix for a config or a metric.
+ */
+ PREFIX
+ }
+
+ public abstract TimelineFilterType getFilterType();
+
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
new file mode 100644
index 0000000..8727bd7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Implementation of {@link TimelineFilter} that represents an ordered list of
+ * timeline filters which will then be evaluated with a specified boolean
+ * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use
+ * timeline filter lists as children of timeline filter lists, you can create a
+ * hierarchy of filters to be evaluated.
+ */
+@Private
+@Unstable
+public class TimelineFilterList extends TimelineFilter {
+ /**
+ * Specifies how filters in the filter list will be evaluated. AND means all
+ * the filters should match and OR means atleast one should match.
+ */
+ @Private
+ @Unstable
+ public static enum Operator {
+ AND,
+ OR
+ }
+
+ private Operator operator;
+ private List<TimelineFilter> filterList = new ArrayList<TimelineFilter>();
+
+ public TimelineFilterList(TimelineFilter...filters) {
+ this(Operator.AND, filters);
+ }
+
+ public TimelineFilterList(Operator op, TimelineFilter...filters) {
+ this.operator = op;
+ this.filterList = new ArrayList<TimelineFilter>(Arrays.asList(filters));
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.LIST;
+ }
+
+ /**
+ * Get the filter list.
+ *
+ * @return filterList
+ */
+ public List<TimelineFilter> getFilterList() {
+ return filterList;
+ }
+
+ /**
+ * Get the operator.
+ *
+ * @return operator
+ */
+ public Operator getOperator() {
+ return operator;
+ }
+
+ public void setOperator(Operator op) {
+ operator = op;
+ }
+
+ public void addFilter(TimelineFilter filter) {
+ filterList.add(filter);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..da3c383
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+ private TimelineFilterUtils() {
+ }
+
+ /**
+ * Returns the equivalent HBase filter list's {@link Operator}.
+ * @param op
+ * @return HBase filter list's Operator.
+ */
+ private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+ switch (op) {
+ case AND:
+ return Operator.MUST_PASS_ALL;
+ case OR:
+ return Operator.MUST_PASS_ONE;
+ default:
+ throw new IllegalArgumentException("Invalid operator");
+ }
+ }
+
+ /**
+ * Returns the equivalent HBase compare filter's {@link CompareOp}.
+ * @param op
+ * @return HBase compare filter's CompareOp.
+ */
+ private static CompareOp getHBaseCompareOp(
+ TimelineCompareOp op) {
+ switch (op) {
+ case LESS_THAN:
+ return CompareOp.LESS;
+ case LESS_OR_EQUAL:
+ return CompareOp.LESS_OR_EQUAL;
+ case EQUAL:
+ return CompareOp.EQUAL;
+ case NOT_EQUAL:
+ return CompareOp.NOT_EQUAL;
+ case GREATER_OR_EQUAL:
+ return CompareOp.GREATER_OR_EQUAL;
+ case GREATER_THAN:
+ return CompareOp.GREATER;
+ default:
+ throw new IllegalArgumentException("Invalid compare operator");
+ }
+ }
+
+ /**
+ * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+ * {@link QualifierFilter}.
+ * @param colPrefix
+ * @param filter
+ * @return a {@link QualifierFilter} object
+ */
+ private static <T> Filter createHBaseColQualPrefixFilter(
+ ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+ return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+ new BinaryPrefixComparator(
+ colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+ }
+
+ /**
+ * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
+ * while converting different timeline filters(of type {@link TimelineFilter})
+ * into their equivalent HBase filters.
+ * @param colPrefix
+ * @param filterList
+ * @return a {@link FilterList} object
+ */
+ public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
+ TimelineFilterList filterList) {
+ FilterList list =
+ new FilterList(getHBaseOperator(filterList.getOperator()));
+ for (TimelineFilter filter : filterList.getFilterList()) {
+ switch(filter.getFilterType()) {
+ case LIST:
+ list.addFilter(
+ createHBaseFilterList(colPrefix, (TimelineFilterList)filter));
+ break;
+ case PREFIX:
+ list.addFilter(createHBaseColQualPrefixFilter(
+ colPrefix, (TimelinePrefixFilter)filter));
+ break;
+ default:
+ break;
+ }
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
new file mode 100644
index 0000000..6233f26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on prefixes.
+ * Prefixes can either match or not match.
+ */
+@Private
+@Unstable
+public class TimelinePrefixFilter extends TimelineFilter {
+
+ private TimelineCompareOp compareOp;
+ private String prefix;
+
+ public TimelinePrefixFilter(TimelineCompareOp op, String prefix) {
+ this.prefix = prefix;
+ if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+ throw new IllegalArgumentException("CompareOp for prefix filter should " +
+ "be EQUAL or NOT_EQUAL");
+ }
+ this.compareOp = op;
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.PREFIX;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public TimelineCompareOp getCompareOp() {
+ return compareOp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index 8324afd..7082a5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@@ -56,18 +66,21 @@ class ApplicationEntityReader extends GenericEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+ true);
}
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -78,13 +91,95 @@ class ApplicationEntityReader extends GenericEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ // Fetch all the columns.
+ if (fieldsToRetrieve.contains(Field.ALL) &&
+ (confsToRetrieve == null ||
+ confsToRetrieve.getFilterList().isEmpty()) &&
+ (metricsToRetrieve == null ||
+ metricsToRetrieve.getFilterList().isEmpty())) {
+ return list;
+ }
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ // Events not required.
+ if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+ !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ }
+ // info not required.
+ if (!fieldsToRetrieve.contains(Field.INFO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ }
+ // is releated to not required.
+ if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ }
+ // relates to not required.
+ if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ }
+ list.addFilter(infoColFamilyList);
+ if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+ (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterCfg =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
+ if (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty()) {
+ filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.CONFIG, confsToRetrieve));
+ }
+ list.addFilter(filterCfg);
+ }
+ if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+ (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterMetrics =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.METRIC, metricsToRetrieve));
+ }
+ list.addFilter(filterMetrics);
+ }
+ return list;
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@@ -115,6 +210,15 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
+ if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+ confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.CONFIGS);
+ }
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@@ -136,7 +240,7 @@ class ApplicationEntityReader extends GenericEntityReader {
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (flowRunId != null) {
scan.setRowPrefixFilter(ApplicationRowKey.
@@ -145,7 +249,12 @@ class ApplicationEntityReader extends GenericEntityReader {
scan.setRowPrefixFilter(ApplicationRowKey.
getRowKeyPrefix(clusterId, userId, flowId));
}
- scan.setFilter(new PageFilter(limit));
+ FilterList newList = new FilterList();
+ newList.addFilter(new PageFilter(limit));
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ newList.addFilter(filterList);
+ }
+ scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 30d1d00..48bf844 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
@@ -272,6 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
if (limit == null || limit <= 0) {
limit = DEFAULT_LIMIT;
@@ -386,7 +388,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
+ throws IOException {
String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
flowRunId, appId);
File dir = new File(new File(rootPath, ENTITIES_DIR),
@@ -413,6 +417,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
String flowRunPath =
getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
@@ -422,6 +427,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
return getEntities(dir, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
index 3e32128..71dd0a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@@ -58,14 +59,14 @@ class FlowActivityEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, null, null, fieldsToRetrieve, true);
}
public FlowActivityEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ null, null, fieldsToRetrieve);
}
/**
@@ -96,15 +97,20 @@ class FlowActivityEntityReader extends TimelineEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected FilterList constructFilterListBasedOnFields() {
+ return null;
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
throw new UnsupportedOperationException(
"we don't support a single entity query");
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
createdTimeEnd == DEFAULT_END_TIME) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index ebf2d27..1895fa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
@@ -54,18 +64,20 @@ class FlowRunEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
}
public FlowRunEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ null, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -101,26 +113,69 @@ class FlowRunEntityReader extends TimelineEntityReader {
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
+ }
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
+ // Metrics not required.
+ if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
+ !fieldsToRetrieve.contains(Field.ALL)) {
+ FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
+ infoColFamilyList.addFilter(infoColumnFamily);
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
+ list.addFilter(infoColFamilyList);
+ }
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ FilterList infoColFamilyList = new FilterList();
+ infoColFamilyList.addFilter(infoColumnFamily);
+ infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+ list.addFilter(infoColFamilyList);
}
+ return list;
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
- scan.setFilter(new PageFilter(limit));
+ FilterList newList = new FilterList();
+ newList.addFilter(new PageFilter(limit));
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ newList.addFilter(filterList);
+ }
+ scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 04fc8ee..dcb8b89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -72,18 +82,21 @@ class GenericEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, sortedKeys);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+ sortedKeys);
}
public GenericEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -93,6 +106,85 @@ class GenericEntityReader extends TimelineEntityReader {
return ENTITY_TABLE;
}
+ @Override
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ // Fetch all the columns.
+ if (fieldsToRetrieve.contains(Field.ALL) &&
+ (confsToRetrieve == null ||
+ confsToRetrieve.getFilterList().isEmpty()) &&
+ (metricsToRetrieve == null ||
+ metricsToRetrieve.getFilterList().isEmpty())) {
+ return list;
+ }
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ // Events not required.
+ if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+ !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ }
+ // info not required.
+ if (!fieldsToRetrieve.contains(Field.INFO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ }
+ // is related to not required.
+ if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ }
+ // relates to not required.
+ if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ }
+ list.addFilter(infoColFamilyList);
+ if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+ (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterCfg =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
+ if (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty()) {
+ filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.CONFIG, confsToRetrieve));
+ }
+ list.addFilter(filterCfg);
+ }
+ if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+ (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterMetrics =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.METRIC, metricsToRetrieve));
+ }
+ list.addFilter(filterMetrics);
+ }
+ return list;
+ }
+
protected FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
@@ -145,6 +237,15 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
+ if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+ confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.CONFIGS);
+ }
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@@ -165,25 +266,31 @@ class GenericEntityReader extends TimelineEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
clusterId, userId, flowId, flowRunId, appId, entityType));
scan.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ scan.setFilter(filterList);
+ }
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 889ae19..9e4b26a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {
@@ -64,11 +65,13 @@ public class HBaseTimelineReaderImpl
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve)
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
- flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
+ flowId, flowRunId, appId, entityType, entityId, confsToRetrieve,
+ metricsToRetrieve, fieldsToRetrieve);
return reader.readEntity(hbaseConf, conn);
}
@@ -80,13 +83,15 @@ public class HBaseTimelineReaderImpl
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
- metricFilters, eventFilters, fieldsToRetrieve);
+ metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve,
+ fieldsToRetrieve);
return reader.readEntities(hbaseConf, conn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
index adaf42e..7178aab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@@ -70,6 +72,8 @@ abstract class TimelineEntityReader {
protected Map<String, String> configFilters;
protected Set<String> metricFilters;
protected Set<String> eventFilters;
+ protected TimelineFilterList confsToRetrieve;
+ protected TimelineFilterList metricsToRetrieve;
/**
* Main table the entity reader uses.
@@ -94,6 +98,7 @@ abstract class TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
this.singleEntityRead = false;
this.sortedKeys = sortedKeys;
@@ -115,6 +120,8 @@ abstract class TimelineEntityReader {
this.configFilters = configFilters;
this.metricFilters = metricFilters;
this.eventFilters = eventFilters;
+ this.confsToRetrieve = confsToRetrieve;
+ this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
@@ -124,7 +131,8 @@ abstract class TimelineEntityReader {
*/
protected TimelineEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
this.singleEntityRead = true;
this.userId = userId;
this.clusterId = clusterId;
@@ -134,11 +142,21 @@ abstract class TimelineEntityReader {
this.entityType = entityType;
this.fieldsToRetrieve = fieldsToRetrieve;
this.entityId = entityId;
+ this.confsToRetrieve = confsToRetrieve;
+ this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
/**
+ * Creates a {@link FilterList} based on fields, confs and metrics to
+ * retrieve. This filter list will be set in Scan/Get objects to trim down
+ * results fetched from HBase back-end storage.
+ * @return a {@link FilterList} object.
+ */
+ protected abstract FilterList constructFilterListBasedOnFields();
+
+ /**
* Reads and deserializes a single timeline entity from the HBase storage.
*/
public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
@@ -146,7 +164,8 @@ abstract class TimelineEntityReader {
validateParams();
augmentParams(hbaseConf, conn);
- Result result = getResult(hbaseConf, conn);
+ FilterList filterList = constructFilterListBasedOnFields();
+ Result result = getResult(hbaseConf, conn, filterList);
if (result == null || result.isEmpty()) {
// Could not find a matching row.
LOG.info("Cannot find matching entity of type " + entityType);
@@ -166,7 +185,8 @@ abstract class TimelineEntityReader {
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
- ResultScanner results = getResults(hbaseConf, conn);
+ FilterList filterList = constructFilterListBasedOnFields();
+ ResultScanner results = getResults(hbaseConf, conn, filterList);
try {
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
@@ -211,14 +231,14 @@ abstract class TimelineEntityReader {
*
* @return the {@link Result} instance or null if no such record is found.
*/
- protected abstract Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException;
+ protected abstract Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException;
/**
* Fetches a {@link ResultScanner} for a multi-entity read.
*/
protected abstract ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException;
+ Connection conn, FilterList filterList) throws IOException;
/**
* Given a {@link Result} instance, deserializes and creates a
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
index f5341c2..16204c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
/**
@@ -34,22 +35,23 @@ class TimelineEntityReaderFactory {
*/
public static TimelineEntityReader createSingleEntityReader(String userId,
String clusterId, String flowId, Long flowRunId, String appId,
- String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityType, String entityId, TimelineFilterList confs,
+ TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
}
}
@@ -64,6 +66,7 @@ class TimelineEntityReaderFactory {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confs, TimelineFilterList metrics,
EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
@@ -71,8 +74,8 @@ class TimelineEntityReaderFactory {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
@@ -83,15 +86,15 @@ class TimelineEntityReaderFactory {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve, false);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
index e4e305e..0ed17da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
/** ATSv2 reader interface. */
@Private
@@ -70,6 +72,18 @@ public interface TimelineReader extends Service {
* Entity type (mandatory)
* @param entityId
* Entity Id (mandatory)
+ * @param confsToRetrieve
+ * Used for deciding which configs to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact config
+ * keys' or prefixes which are then compared against config keys' to decide
+ * configs to return in response.
+ * @param metricsToRetrieve
+ * Used for deciding which metrics to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact metric
+ * ids' or prefixes which are then compared against metric ids' to decide
+ * metrics to return in response.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@@ -81,6 +95,7 @@ public interface TimelineReader extends Service {
*/
TimelineEntity getEntity(String userId, String clusterId, String flowId,
Long flowRunId, String appId, String entityType, String entityId,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
@@ -139,6 +154,22 @@ public interface TimelineReader extends Service {
* @param eventFilters
* Matched entities should contain the given events (optional). If null
* or empty, the filter is not applied.
+ * @param confsToRetrieve
+ * Used for deciding which configs to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact config
+ * keys' or prefixes which are then compared against config keys' to decide
+ * configs(inside entities) to return in response. This should not be
+ * confused with configFilters which is used to decide which entities to
+ * return instead.
+ * @param metricsToRetrieve
+ * Used for deciding which metrics to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact metric
+ * ids' or prefixes which are then compared against metric ids' to decide
+ * metrics(inside entities) to return in response. This should not be
+ * confused with metricFilters which is used to decide which entities to
+ * return instead.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@@ -158,5 +189,6 @@ public interface TimelineReader extends Service {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index b06f5c1..056e51f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -119,6 +119,18 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return columnPrefix;
}
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
/*
* (non-Javadoc)
*
@@ -139,8 +151,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@@ -166,8 +177,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index db49098..0f3ac4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -44,13 +44,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
- *@param attributes attributes for the mutation that are used by the coprocessor
- * to set/read the cell tags
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
- public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
byte[] qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@@ -65,13 +65,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
- *@param attributes attributes for the mutation that are used by the coprocessor
- * to set/read the cell tags
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
- public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@@ -86,7 +86,7 @@ public interface ColumnPrefix<T> {
* in the result.
* @throws IOException
*/
- public Object readResult(Result result, String qualifier) throws IOException;
+ Object readResult(Result result, String qualifier) throws IOException;
/**
* @param result from which to read columns
@@ -94,7 +94,7 @@ public interface ColumnPrefix<T> {
* (or all of them if the prefix value is null).
* @throws IOException
*/
- public Map<String, Object> readResults(Result result) throws IOException;
+ Map<String, Object> readResults(Result result) throws IOException;
/**
* @param result from which to reads data with timestamps
@@ -104,7 +104,18 @@ public interface ColumnPrefix<T> {
* idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException
*/
- public <V> NavigableMap<String, NavigableMap<Long, V>>
+ <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
+ /**
+ * @param qualifierPrefix Column qualifier or prefix of qualifier.
+ * @return a byte array encoding column prefix and qualifier/prefix passed.
+ */
+ byte[] getColumnPrefixBytes(String qualifierPrefix);
+
+ /**
+ * @param qualifierPrefix Column qualifier or prefix of qualifier.
+ * @return a byte array encoding column prefix and qualifier/prefix passed.
+ */
+ byte[] getColumnPrefixBytes(byte[] qualifierPrefix);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76bc71cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index abede9c..5b71228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -119,6 +119,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return columnPrefix;
}
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
/*
* (non-Javadoc)
*
@@ -140,8 +152,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@@ -167,8 +178,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);