You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:28 UTC
[21/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a
single module - tez-api (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/Constants.java b/tez-common/src/main/java/org/apache/tez/common/Constants.java
deleted file mode 100644
index 8ea2909..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/Constants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-
-
-public class Constants {
-
- // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
-
- public static final String TEZ = "tez";
-
- public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
- public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
- public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
-
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
- public static String MERGED_OUTPUT_PREFIX = ".merged";
-
- // TODO NEWTEZ Remove this constant once the old code is removed.
- public static final String TEZ_ENGINE_TASK_ATTEMPT_ID =
- "tez.engine.task.attempt.id";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING = "file.out";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING = ".index";
-
- public static final String TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING = "%s/task_%d.out";
-
- public static final String TEZ_ENGINE_JOB_CREDENTIALS =
- "tez.engine.job.credentials";
-
- @Private
- public static final String TEZ_ENGINE_TASK_MEMORY = "tez.engine.task.memory";
-
- public static final String TASK_OUTPUT_DIR = "output";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_MANAGER =
- "tez.engine.task.local.output.manager";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/ContainerContext.java b/tez-common/src/main/java/org/apache/tez/common/ContainerContext.java
deleted file mode 100644
index df92bdc..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/ContainerContext.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-// TODO EVENTUALLY move this over to PB. Fix package/module.
-// TODO EVENTUALLY unit tests for functionality.
-public class ContainerContext implements Writable {
-
- String containerIdentifier;
- String pid;
-
- public ContainerContext() {
- containerIdentifier = "";
- pid = "";
- }
-
- public ContainerContext(String containerIdStr, String pid) {
- this.containerIdentifier = containerIdStr;
- this.pid = pid;
- }
-
- public String getContainerIdentifier() {
- return containerIdentifier;
- }
-
- public String getPid() {
- return pid;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.containerIdentifier = Text.readString(in);
- this.pid = Text.readString(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, containerIdentifier);
- Text.writeString(out, pid);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/InputSpec.java b/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
deleted file mode 100644
index e9faa26..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class InputSpec implements Writable {
-
- private String vertexName;
- private int inDegree;
- private String inputClassName;
-
- public InputSpec() {
- }
-
- public InputSpec(String vertexName, int inDegree,
- String inputClassName) {
- this.vertexName = vertexName;
- this.inDegree = inDegree;
- this.inputClassName = inputClassName;
- }
-
- /**
- * @return the name of the input vertex.
- */
- public String getVertexName() {
- return this.vertexName;
- }
-
- /**
- * @return the number of inputs for this task, which will be available from
- * the specified vertex.
- */
- public int getNumInputs() {
- return this.inDegree;
- }
-
- /**
- * @return Input class name
- */
- public String getInputClassName() {
- return this.inputClassName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, vertexName);
- out.writeInt(inDegree);
- Text.writeString(out, inputClassName);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- vertexName = Text.readString(in);
- this.inDegree = in.readInt();
- inputClassName = Text.readString(in);
- }
-
- @Override
- public String toString() {
- return "VertexName: " + vertexName + ", InDegree: " + inDegree
- + ", InputClassName=" + inputClassName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java b/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
deleted file mode 100644
index 52ec5d8..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class OutputSpec implements Writable {
-
- private String vertexName;
- private int outDegree;
- private String outputClassName;
-
- public OutputSpec() {
- }
-
- public OutputSpec(String vertexName, int outDegree,
- String outputClassName) {
- this.vertexName = vertexName;
- this.outDegree = outDegree;
- this.outputClassName = outputClassName;
- }
-
- /**
- * @return the name of the output vertex.
- */
- public String getVertexName() {
- return this.vertexName;
- }
-
- /**
- * @return the number of outputs to be generated by this task.
- */
- public int getNumOutputs() {
- return this.outDegree;
- }
-
- /**
- * @return Output class name
- */
- public String getOutputClassName() {
- return this.outputClassName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, vertexName);
- out.writeInt(outDegree);
- Text.writeString(out, outputClassName);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- vertexName = Text.readString(in);
- this.outDegree = in.readInt();
- outputClassName = Text.readString(in);
- }
-
- @Override
- public String toString() {
- return "VertexName: " + vertexName + ", OutDegree: " + outDegree
- + ", OutputClassName=" + outputClassName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
deleted file mode 100644
index 7c4540c..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-
-
-/**
- * Meant for user configurable job properties. For others look at {@link Constants}
- *
- */
-
-// TODO EVENTUALLY A description for each property.
-@Private
-@Evolving
-public class TezJobConfig {
-
-
-
-
- /** The number of milliseconds between progress reports. */
- public static final int PROGRESS_INTERVAL = 3000;
-
- public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
-
- /**
- * Configuration key to enable/disable IFile readahead.
- */
- public static final String TEZ_ENGINE_IFILE_READAHEAD =
- "tez.engine.ifile.readahead";
- public static final boolean DEFAULT_TEZ_ENGINE_IFILE_READAHEAD = true;
-
- /**
- * Configuration key to set the IFile readahead length in bytes.
- */
- public static final String TEZ_ENGINE_IFILE_READAHEAD_BYTES =
- "tez.engine.ifile.readahead.bytes";
- public static final int DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES =
- 4 * 1024 * 1024;
-
- /**
- *
- */
- public static final String RECORDS_BEFORE_PROGRESS =
- "tez.task.merge.progress.records";
- public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000;
-
- /**
- * List of directories avialble to the engine.
- */
- public static final String LOCAL_DIRS = "tez.engine.local.dirs";
- public static final String DEFAULT_LOCAL_DIRS = "/tmp";
-
- /**
- * One local dir for the speicfic job.
- */
- public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir";
-
- /**
- * The directory which contains the localized files for this task.
- */
- @Private
- public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
- public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
-
- public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir";
-
- /**
- *
- */
- public static final String TEZ_ENGINE_IO_SORT_FACTOR =
- "tez.engine.io.sort.factor";
- public static final int DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR = 100;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SORT_SPILL_PERCENT =
- "tez.engine.sort.spill.percent";
- public static float DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT = 0.8f;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_IO_SORT_MB = "tez.engine.io.sort.mb";
- public static final int DEFAULT_TEZ_ENGINE_IO_SORT_MB = 100;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES =
- "tez.engine.index.cache.memory.limit.bytes";
- public static final int DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES =
- 1024 * 1024;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_COMBINE_MIN_SPILLS =
- "tez.engine.combine.min.spills";
- public static final int DEFAULT_TEZ_ENGINE_COMBINE_MIN_SPILLS = 3;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SORT_THREADS =
- "tez.engine.sort.threads";
- public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
-
- /**
- * Specifies a partitioner class, which is used in Tez engine components like OnFileSortedOutput
- */
- public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
-
- /**
- * Specifies a combiner class (primarily for Shuffle)
- */
- public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class";
-
- public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
-
- /**
- *
- */
- public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";
- public static final int COUNTERS_MAX_DEFAULT = 120;
-
- /**
- *
- */
- public static final String COUNTER_GROUP_NAME_MAX_KEY = "tez.engine.job.counters.group.name.max";
- public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
-
- /**
- *
- */
- public static final String COUNTER_NAME_MAX_KEY = "tez.engine.job.counters.counter.name.max";
- public static final int COUNTER_NAME_MAX_DEFAULT = 64;
-
- /**
- *
- */
- public static final String COUNTER_GROUPS_MAX_KEY = "tez.engine.job.counters.groups.max";
- public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
-
-
- /**
- * Temporary interface for MR only (not chained Tez) to indicate whether
- * in-memory shuffle should be used.
- */
- @Private
- public static final String TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY =
- "tez.engine.shuffle.use.in-memory";
- public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
-
- // TODO NEWTEZ Remove these config parameters. Will be part of an event.
- @Private
- public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE =
- "tez.engine.shuffle.partition-range";
- public static int TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES =
- "tez.engine.shuffle.parallel.copies";
- public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 20;
-
- /**
- * TODO Is this user configurable.
- */
- public static final String TEZ_ENGINE_METRICS_SESSION_ID =
- "tez.engine.metrics.session.id";
- public static final String DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID = "";
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_FETCH_FAILURES =
- "tez.engine.shuffle.fetch.failures.limit";
- public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR =
- "tez.engine.shuffle.notify.readerror";
- public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = true;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT =
- "tez.engine.shuffle.connect.timeout";
- public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT =
- 3 * 60 * 1000;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = "tez.engine.shuffle.read.timeout";
- public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT =
- 3 * 60 * 1000;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_ENABLE_SSL =
- "tez.engine.shuffle.ssl.enable";
- public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL = false;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
- "tez.engine.shuffle.input.buffer.percent";
- public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
- 0.90f;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT =
- "tez.engine.shuffle.memory.limit.percent";
- public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT =
- 0.25f;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_MERGE_PERCENT =
- "tez.engine.shuffle.merge.percent";
- public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 0.90f;
-
- /**
- * TODO TEZAM3 default value ?
- */
- public static final String TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS =
- "tez.engine.shuffle.memory-to-memory.segments";
-
- /**
- *
- */
- public static final String TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM =
- "tez.engine.shuffle.memory-to-memory.enable";
- public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM =
- false;
-
- /**
- *
- */
- public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT =
- "tez.engine.task.input.buffer.percent";
- public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
-
- // TODO Rename.
- public static final String TEZ_ENGINE_GROUP_COMPARATOR_CLASS =
- "tez.engine.group.comparator.class";
-
- // TODO Better name.
- public static final String TEZ_ENGINE_INTERNAL_SORTER_CLASS =
- "tez.engine.internal.sorter.class";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS =
- "tez.engine.intermediate-output.key.comparator.class";
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS =
- "tez.engine.intermediate-input.key.comparator.class";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS =
- "tez.engine.intermediate-output.key.class";
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS =
- "tez.engine.intermediate-input.key.class";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS =
- "tez.engine.intermediate-output.value.class";
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS =
- "tez.engine.intermediate-input.value.class";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS =
- "tez.engine.intermediate-output.should-compress";
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED =
- "tez.engine.intermdiate-input.is-compressed";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC =
- "tez.engine.intermediate-output.compress.codec";
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC =
- "tez.engine.intermediate-input.compress.codec";
-
- public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS =
- "tez.engine.intermediate-input.key.secondary.comparator.class";
-
- // TODO This should be in DAGConfiguration
- /* config for tracking the local file where all the credentials for the job
- * credentials.
- */
- public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java b/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
deleted file mode 100644
index 4eed78b..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public abstract class TezTaskContext implements Writable {
-
- // Serialized Fields
- private TezTaskAttemptID taskAttemptId;
- private String user;
- private String jobName;
- private String vertexName;
-
- public TezTaskContext() {
- }
-
- public TezTaskContext(TezTaskAttemptID taskAttemptID, String user, String jobName,
- String vertexName) {
- this.taskAttemptId = taskAttemptID;
- this.user = user;
- this.jobName = jobName;
- this.vertexName = vertexName;
- }
-
- public TezTaskAttemptID getTaskAttemptId() {
- return taskAttemptId;
- }
-
-
-
- public TezDAGID getDAGID() {
- return taskAttemptId.getTaskID().getVertexID().getDAGId();
- }
-
- public String getUser() {
- return user;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public String getVertexName() {
- return this.vertexName;
- }
-
- public void statusUpdate() throws IOException, InterruptedException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
- Text.writeString(out, user);
- Text.writeString(out, jobName);
- Text.writeString(out, vertexName);
- }
-
- public void readFields(DataInput in) throws IOException {
- taskAttemptId = TezTaskAttemptID.read(in);
- user = Text.readString(in);
- jobName = Text.readString(in);
- vertexName = Text.readString(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java b/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java
deleted file mode 100644
index de81f87..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public interface TezTaskStatus extends Writable {
-
- //enumeration for reporting current phase of a task.
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
-
- // what state is the task in?
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
-
- public abstract TezTaskAttemptID getTaskAttemptId();
-
- public abstract float getProgress();
-
- public abstract void setProgress(float progress);
-
- public abstract State getRunState();
-
- public abstract void setRunState(State runState);
-
- public abstract String getDiagnosticInfo();
-
- public abstract void setDiagnosticInfo(String info);
-
- // TODOTEZDAG Remove stateString / rename
- public abstract String getStateString();
-
- public abstract void setStateString(String stateString);
-
- public abstract long getFinishTime();
-
- public abstract void setFinishTime(long finishTime);
-
- // TODOTEZDAG Can shuffle / merge be made generic ? Otherwise just a single finish time.
- public abstract long getShuffleFinishTime();
-
- public abstract void setShuffleFinishTime(long shuffleFinishTime);
-
- public abstract long getMapFinishTime();
-
- public abstract void setMapFinishTime(long mapFinishTime);
-
- public abstract long getSortFinishTime();
-
- public abstract void setSortFinishTime(long sortFinishTime);
-
- public abstract long getStartTime();
-
- public abstract void setStartTime(long startTime);
-
- // TODOTEZDAG Remove phase
- public abstract Phase getPhase();
-
- public abstract void setPhase(Phase phase);
-
- public abstract TezCounters getCounters();
-
- public abstract void setCounters(TezCounters counters);
-
- public abstract List<TezTaskAttemptID> getFailedDependencies();
-
- public abstract void addFailedDependency(TezTaskAttemptID taskAttempttId);
-
- public abstract void clearStatus();
-
- public abstract void statusUpdate(float f, String string, TezCounters counters);
-
- // TODOTEZDAG maybe remove ?
- public abstract long getLocalOutputSize();
-
- public abstract void setOutputSize(long l);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
deleted file mode 100644
index e64a26c..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import com.google.common.base.Objects;
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * An abstract counter class to provide common implementation of
- * the counter interface in both mapred and mapreduce packages.
- */
-@InterfaceAudience.Private
-public abstract class AbstractCounter implements TezCounter {
-
- @Deprecated
- @Override
- public void setDisplayName(String name) {}
-
- @Override
- public synchronized boolean equals(Object genericRight) {
- if (genericRight instanceof TezCounter) {
- synchronized (genericRight) {
- TezCounter right = (TezCounter) genericRight;
- return getName().equals(right.getName()) &&
- getDisplayName().equals(right.getDisplayName()) &&
- getValue() == right.getValue();
- }
- }
- return false;
- }
-
- @Override
- public synchronized int hashCode() {
- return Objects.hashCode(getName(), getDisplayName(), getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java b/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
deleted file mode 100644
index d8896ed..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Iterators;
-
-/**
- * An abstract class to provide common implementation of the
- * generic counter group in both mapred and mapreduce package.
- *
- * @param <T> type of the counter for the group
- */
-@InterfaceAudience.Private
-public abstract class AbstractCounterGroup<T extends TezCounter>
- implements CounterGroupBase<T> {
-
- private final String name;
- private String displayName;
- private final ConcurrentMap<String, T> counters =
- new ConcurrentSkipListMap<String, T>();
- private final Limits limits;
-
- public AbstractCounterGroup(String name, String displayName,
- Limits limits) {
- this.name = name;
- this.displayName = displayName;
- this.limits = limits;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public synchronized String getDisplayName() {
- return displayName;
- }
-
- @Override
- public synchronized void setDisplayName(String displayName) {
- this.displayName = displayName;
- }
-
- @Override
- public synchronized void addCounter(T counter) {
- counters.put(counter.getName(), counter);
- limits.incrCounters();
- }
-
- @Override
- public synchronized T addCounter(String counterName, String displayName,
- long value) {
- String saveName = Limits.filterCounterName(counterName);
- T counter = findCounterImpl(saveName, false);
- if (counter == null) {
- return addCounterImpl(saveName, displayName, value);
- }
- counter.setValue(value);
- return counter;
- }
-
- private T addCounterImpl(String name, String displayName, long value) {
- T counter = newCounter(name, displayName, value);
- addCounter(counter);
- return counter;
- }
-
- @Override
- public synchronized T findCounter(String counterName, String displayName) {
- // Take lock to avoid two threads not finding a counter and trying to add
- // the same counter.
- String saveName = Limits.filterCounterName(counterName);
- T counter = findCounterImpl(saveName, false);
- if (counter == null) {
- return addCounterImpl(saveName, displayName, 0);
- }
- return counter;
- }
-
- @Override
- public T findCounter(String counterName, boolean create) {
- return findCounterImpl(Limits.filterCounterName(counterName), create);
- }
-
- // Lock the object. Cannot simply use concurrent constructs on the counters
- // data-structure (like putIfAbsent) because of localization, limits etc.
- private synchronized T findCounterImpl(String counterName, boolean create) {
- T counter = counters.get(counterName);
- if (counter == null && create) {
- String localized =
- ResourceBundles.getCounterName(getName(), counterName, counterName);
- return addCounterImpl(counterName, localized, 0);
- }
- return counter;
- }
-
- @Override
- public T findCounter(String counterName) {
- return findCounter(counterName, true);
- }
-
- /**
- * Abstract factory method to create a new counter of type T
- * @param counterName of the counter
- * @param displayName of the counter
- * @param value of the counter
- * @return a new counter
- */
- protected abstract T newCounter(String counterName, String displayName,
- long value);
-
- /**
- * Abstract factory method to create a new counter of type T
- * @return a new counter object
- */
- protected abstract T newCounter();
-
- @Override
- public Iterator<T> iterator() {
- return counters.values().iterator();
- }
-
- /**
- * GenericGroup ::= displayName #counter counter*
- */
- @Override
- public synchronized void write(DataOutput out) throws IOException {
- Text.writeString(out, displayName);
- WritableUtils.writeVInt(out, counters.size());
- for(TezCounter counter: counters.values()) {
- counter.write(out);
- }
- }
-
- @Override
- public synchronized void readFields(DataInput in) throws IOException {
- displayName = Text.readString(in);
- counters.clear();
- int size = WritableUtils.readVInt(in);
- for (int i = 0; i < size; i++) {
- T counter = newCounter();
- counter.readFields(in);
- counters.put(counter.getName(), counter);
- limits.incrCounters();
- }
- }
-
- @Override
- public synchronized int size() {
- return counters.size();
- }
-
- @Override
- public synchronized boolean equals(Object genericRight) {
- if (genericRight instanceof CounterGroupBase<?>) {
- @SuppressWarnings("unchecked")
- CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
- return Iterators.elementsEqual(iterator(), right.iterator());
- }
- return false;
- }
-
- @Override
- public synchronized int hashCode() {
- return counters.hashCode();
- }
-
- @Override
- public void incrAllCounters(CounterGroupBase<T> rightGroup) {
- try {
- for (TezCounter right : rightGroup) {
- TezCounter left = findCounter(right.getName(), right.getDisplayName());
- left.increment(right.getValue());
- }
- } catch (LimitExceededException e) {
- counters.clear();
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java b/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
deleted file mode 100644
index fd4fdee..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import static org.apache.tez.common.counters.CounterGroupFactory.getFrameworkGroupId;
-import static org.apache.tez.common.counters.CounterGroupFactory.isFrameworkGroup;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-
-/**
- * An abstract class to provide common implementation for the Counters
- * container in both mapred and mapreduce packages.
- *
- * @param <C> type of counter inside the counters
- * @param <G> type of group inside the counters
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public abstract class AbstractCounters<C extends TezCounter,
- G extends CounterGroupBase<C>>
- implements Writable, Iterable<G> {
-
- protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
-
- /**
- * A cache from enum values to the associated counter.
- */
- private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
- //framework & fs groups
- private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
- // other groups
- private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
- private final CounterGroupFactory<C, G> groupFactory;
-
- // For framework counter serialization without strings
- enum GroupType { FRAMEWORK, FILESYSTEM };
-
- // Writes only framework and fs counters if false.
- private boolean writeAllCounters = true;
-
- private static final Map<String, String> legacyMap = Maps.newHashMap();
- static {
- legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
- TaskCounter.class.getName());
- legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
- JobCounter.class.getName());
- legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName());
- }
-
- private final Limits limits = new Limits();
-
- @InterfaceAudience.Private
- public AbstractCounters(CounterGroupFactory<C, G> gf) {
- groupFactory = gf;
- }
-
- /**
- * Construct from another counters object.
- * @param <C1> type of the other counter
- * @param <G1> type of the other counter group
- * @param counters the counters object to copy
- * @param groupFactory the factory for new groups
- */
- @InterfaceAudience.Private
- public <C1 extends TezCounter, G1 extends CounterGroupBase<C1>>
- AbstractCounters(AbstractCounters<C1, G1> counters,
- CounterGroupFactory<C, G> groupFactory) {
- this.groupFactory = groupFactory;
- for(G1 group: counters) {
- String name = group.getName();
- G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
- (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
- for(TezCounter counter: group) {
- newGroup.addCounter(counter.getName(), counter.getDisplayName(),
- counter.getValue());
- }
- }
- }
-
- /** Add a group.
- * @param group object to add
- * @return the group
- */
- @InterfaceAudience.Private
- public synchronized G addGroup(G group) {
- String name = group.getName();
- if (isFrameworkGroup(name)) {
- fgroups.put(name, group);
- } else {
- limits.checkGroups(groups.size() + 1);
- groups.put(name, group);
- }
- return group;
- }
-
- /**
- * Add a new group
- * @param name of the group
- * @param displayName of the group
- * @return the group
- */
- @InterfaceAudience.Private
- public G addGroup(String name, String displayName) {
- return addGroup(groupFactory.newGroup(name, displayName, limits));
- }
-
- /**
- * Find a counter, create one if necessary
- * @param groupName of the counter
- * @param counterName name of the counter
- * @return the matching counter
- */
- public C findCounter(String groupName, String counterName) {
- G grp = getGroup(groupName);
- return grp.findCounter(counterName);
- }
-
- /**
- * Find the counter for the given enum. The same enum will always return the
- * same counter.
- * @param key the counter key
- * @return the matching counter object
- */
- public synchronized C findCounter(Enum<?> key) {
- C counter = cache.get(key);
- if (counter == null) {
- counter = findCounter(key.getDeclaringClass().getName(), key.name());
- cache.put(key, counter);
- }
- return counter;
- }
-
- /**
- * Find the file system counter for the given scheme and enum.
- * @param scheme of the file system
- * @param key the enum of the counter
- * @return the file system counter
- */
- @InterfaceAudience.Private
- public synchronized C findCounter(String scheme, FileSystemCounter key) {
- return ((FileSystemCounterGroup<C>) getGroup(
- FileSystemCounter.class.getName()).getUnderlyingGroup()).
- findCounter(scheme, key);
- }
-
- /**
- * Returns the names of all counter classes.
- * @return Set of counter names.
- */
- public synchronized Iterable<String> getGroupNames() {
- HashSet<String> deprecated = new HashSet<String>();
- for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
- String newGroup = entry.getValue();
- boolean isFGroup = isFrameworkGroup(newGroup);
- if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
- deprecated.add(entry.getKey());
- }
- }
- return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);
- }
-
- @Override
- public Iterator<G> iterator() {
- return Iterators.concat(fgroups.values().iterator(),
- groups.values().iterator());
- }
-
- /**
- * Returns the named counter group, or an empty group if there is none
- * with the specified name.
- * @param groupName name of the group
- * @return the group
- */
- public synchronized G getGroup(String groupName) {
-
- // filterGroupName
- boolean groupNameInLegacyMap = true;
- String newGroupName = legacyMap.get(groupName);
- if (newGroupName == null) {
- groupNameInLegacyMap = false;
- newGroupName = Limits.filterGroupName(groupName);
- }
-
- boolean isFGroup = isFrameworkGroup(newGroupName);
- G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName);
- if (group == null) {
- group = groupFactory.newGroup(newGroupName, limits);
- if (isFGroup) {
- fgroups.put(newGroupName, group);
- } else {
- limits.checkGroups(groups.size() + 1);
- groups.put(newGroupName, group);
- }
- if (groupNameInLegacyMap) {
- LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName
- + " instead");
- }
- }
- return group;
- }
-
- /**
- * Returns the total number of counters, by summing the number of counters
- * in each group.
- * @return the total number of counters
- */
- public synchronized int countCounters() {
- int result = 0;
- for (G group : this) {
- result += group.size();
- }
- return result;
- }
-
- /**
- * Write the set of groups.
- * Counters ::= version #fgroups (groupId, group)* #groups (group)*
- */
- @Override
- public synchronized void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, groupFactory.version());
- WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
- for (G group : fgroups.values()) {
- if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
- WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
- WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
- group.write(out);
- } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
- WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
- group.write(out);
- }
- }
- if (writeAllCounters) {
- WritableUtils.writeVInt(out, groups.size());
- for (G group : groups.values()) {
- Text.writeString(out, group.getName());
- group.write(out);
- }
- } else {
- WritableUtils.writeVInt(out, 0);
- }
- }
-
- @Override
- public synchronized void readFields(DataInput in) throws IOException {
- int version = WritableUtils.readVInt(in);
- if (version != groupFactory.version()) {
- throw new IOException("Counters version mismatch, expected "+
- groupFactory.version() +" got "+ version);
- }
- int numFGroups = WritableUtils.readVInt(in);
- fgroups.clear();
- GroupType[] groupTypes = GroupType.values();
- while (numFGroups-- > 0) {
- GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
- G group;
- switch (groupType) {
- case FILESYSTEM: // with nothing
- group = groupFactory.newFileSystemGroup();
- break;
- case FRAMEWORK: // with group id
- group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
- break;
- default: // Silence dumb compiler, as it would've thrown earlier
- throw new IOException("Unexpected counter group type: "+ groupType);
- }
- group.readFields(in);
- fgroups.put(group.getName(), group);
- }
- int numGroups = WritableUtils.readVInt(in);
- while (numGroups-- > 0) {
- limits.checkGroups(groups.size() + 1);
- G group = groupFactory.newGenericGroup(Text.readString(in), null, limits);
- group.readFields(in);
- groups.put(group.getName(), group);
- }
- }
-
- /**
- * Return textual representation of the counter values.
- * @return the string
- */
- @Override
- public synchronized String toString() {
- StringBuilder sb = new StringBuilder("Counters: " + countCounters());
- for (G group: this) {
- sb.append("\n\t").append(group.getDisplayName());
- for (TezCounter counter: group) {
- sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
- .append(counter.getValue());
- }
- }
- return sb.toString();
- }
-
- /**
- * Increments multiple counters by their amounts in another Counters
- * instance.
- * @param other the other Counters instance
- */
- public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
- for(G right : other) {
- String groupName = right.getName();
- G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
- if (left == null) {
- left = addGroup(groupName, right.getDisplayName());
- }
- left.incrAllCounters(right);
- }
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean equals(Object genericRight) {
- if (genericRight instanceof AbstractCounters<?, ?>) {
- return Iterators.elementsEqual(iterator(),
- ((AbstractCounters<C, G>)genericRight).iterator());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return groups.hashCode();
- }
-
- /**
- * Set the "writeAllCounters" option to true or false
- * @param send if true all counters would be serialized, otherwise only
- * framework counters would be serialized in
- * {@link #write(DataOutput)}
- */
- @InterfaceAudience.Private
- public void setWriteAllCounters(boolean send) {
- writeAllCounters = send;
- }
-
- /**
- * Get the "writeAllCounters" option
- * @return true of all counters would serialized
- */
- @InterfaceAudience.Private
- public boolean getWriteAllCounters() {
- return writeAllCounters;
- }
-
- @InterfaceAudience.Private
- public Limits limits() {
- return limits;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java b/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java
deleted file mode 100644
index bc7986d..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * A group of {@link TezCounter}s that logically belong together. Typically,
- * it is an {@link Enum} subclass and the counters are the values.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public interface CounterGroup extends CounterGroupBase<TezCounter> {
- // essentially a typedef so user doesn't have to use generic syntax
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java b/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
deleted file mode 100644
index 3b702ba..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The common counter group interface.
- *
- * @param <T> type of the counter for the group
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface CounterGroupBase<T extends TezCounter>
- extends Writable, Iterable<T> {
-
- /**
- * Get the internal name of the group
- * @return the internal name
- */
- String getName();
-
- /**
- * Get the display name of the group.
- * @return the human readable name
- */
- String getDisplayName();
-
- /**
- * Set the display name of the group
- * @param displayName of the group
- */
- void setDisplayName(String displayName);
-
- /** Add a counter to this group.
- * @param counter to add
- */
- void addCounter(T counter);
-
- /**
- * Add a counter to this group
- * @param name of the counter
- * @param displayName of the counter
- * @param value of the counter
- * @return the counter
- */
- T addCounter(String name, String displayName, long value);
-
- /**
- * Find a counter in the group.
- * @param counterName the name of the counter
- * @param displayName the display name of the counter
- * @return the counter that was found or added
- */
- T findCounter(String counterName, String displayName);
-
- /**
- * Find a counter in the group
- * @param counterName the name of the counter
- * @param create create the counter if not found if true
- * @return the counter that was found or added or null if create is false
- */
- T findCounter(String counterName, boolean create);
-
- /**
- * Find a counter in the group.
- * @param counterName the name of the counter
- * @return the counter that was found or added
- */
- T findCounter(String counterName);
-
- /**
- * @return the number of counters in this group.
- */
- int size();
-
- /**
- * Increment all counters by a group of counters
- * @param rightGroup the group to be added to this group
- */
- void incrAllCounters(CounterGroupBase<T> rightGroup);
-
- @Private
- /**
- * Exposes the underlying group type if a facade.
- * @return the underlying object that this object is wrapping up.
- */
- CounterGroupBase<T> getUnderlyingGroup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java b/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java
deleted file mode 100644
index 45da0dd..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * An abstract class to provide common implementation of the
- * group factory in both mapred and mapreduce packages.
- *
- * @param <C> type of the counter
- * @param <G> type of the group
- */
-@InterfaceAudience.Private
-public abstract class CounterGroupFactory<C extends TezCounter,
- G extends CounterGroupBase<C>> {
-
- public interface FrameworkGroupFactory<F> {
- F newGroup(String name);
- }
-
- // Integer mapping (for serialization) for framework groups
- private static final Map<String, Integer> s2i = Maps.newHashMap();
- private static final List<String> i2s = Lists.newArrayList();
- private static final int VERSION = 1;
- private static final String FS_GROUP_NAME = FileSystemCounter.class.getName();
-
- private final Map<String, FrameworkGroupFactory<G>> fmap = Maps.newHashMap();
- {
- // Add builtin counter class here and the version when changed.
- addFrameworkGroup(TaskCounter.class);
- addFrameworkGroup(JobCounter.class);
- addFrameworkGroup(DAGCounter.class);
- }
-
- // Initialize the framework counter group mapping
- private synchronized <T extends Enum<T>>
- void addFrameworkGroup(final Class<T> cls) {
- updateFrameworkGroupMapping(cls);
- fmap.put(cls.getName(), newFrameworkGroupFactory(cls));
- }
-
- // Update static mappings (c2i, i2s) of framework groups
- private static synchronized void updateFrameworkGroupMapping(Class<?> cls) {
- String name = cls.getName();
- Integer i = s2i.get(name);
- if (i != null) return;
- i2s.add(name);
- s2i.put(name, i2s.size() - 1);
- }
-
- /**
- * Required override to return a new framework group factory
- * @param <T> type of the counter enum class
- * @param cls the counter enum class
- * @return a new framework group factory
- */
- protected abstract <T extends Enum<T>>
- FrameworkGroupFactory<G> newFrameworkGroupFactory(Class<T> cls);
-
- /**
- * Create a new counter group
- * @param name of the group
- * @param limits the counters limits policy object
- * @return a new counter group
- */
- public G newGroup(String name, Limits limits) {
- return newGroup(name, ResourceBundles.getCounterGroupName(name, name),
- limits);
- }
-
- /**
- * Create a new counter group
- * @param name of the group
- * @param displayName of the group
- * @param limits the counters limits policy object
- * @return a new counter group
- */
- public G newGroup(String name, String displayName, Limits limits) {
- FrameworkGroupFactory<G> gf = fmap.get(name);
- if (gf != null) return gf.newGroup(name);
- if (name.equals(FS_GROUP_NAME)) {
- return newFileSystemGroup();
- } else if (s2i.get(name) != null) {
- return newFrameworkGroup(s2i.get(name));
- }
- return newGenericGroup(name, displayName, limits);
- }
-
- /**
- * Create a new framework group
- * @param id of the group
- * @return a new framework group
- */
- public G newFrameworkGroup(int id) {
- String name;
- synchronized(CounterGroupFactory.class) {
- if (id < 0 || id >= i2s.size()) throwBadFrameGroupIdException(id);
- name = i2s.get(id); // should not throw here.
- }
- FrameworkGroupFactory<G> gf = fmap.get(name);
- if (gf == null) throwBadFrameGroupIdException(id);
- return gf.newGroup(name);
- }
-
- /**
- * Get the id of a framework group
- * @param name of the group
- * @return the framework group id
- */
- public static synchronized int getFrameworkGroupId(String name) {
- Integer i = s2i.get(name);
- if (i == null) throwBadFrameworkGroupNameException(name);
- return i;
- }
-
- /**
- * @return the counter factory version
- */
- public int version() {
- return VERSION;
- }
-
- /**
- * Check whether a group name is a name of a framework group (including
- * the filesystem group).
- *
- * @param name to check
- * @return true for framework group names
- */
- public static synchronized boolean isFrameworkGroup(String name) {
- return s2i.get(name) != null || name.equals(FS_GROUP_NAME);
- }
-
- private static void throwBadFrameGroupIdException(int id) {
- throw new IllegalArgumentException("bad framework group id: "+ id);
- }
-
- private static void throwBadFrameworkGroupNameException(String name) {
- throw new IllegalArgumentException("bad framework group name: "+ name);
- }
-
- /**
- * Abstract factory method to create a generic (vs framework) counter group
- * @param name of the group
- * @param displayName of the group
- * @param limits limits of the counters
- * @return a new generic counter group
- */
- protected abstract G newGenericGroup(String name, String displayName,
- Limits limits);
-
- /**
- * Abstract factory method to create a file system counter group
- * @return a new file system counter group
- */
- protected abstract G newFileSystemGroup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java
deleted file mode 100644
index 3598572..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-// Per-job counters
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public enum DAGCounter {
- NUM_FAILED_TASKS,
- NUM_KILLED_TASKS,
- TOTAL_LAUNCHED_TASKS,
- OTHER_LOCAL_TASKS,
- DATA_LOCAL_TASKS,
- RACK_LOCAL_TASKS,
- SLOTS_MILLIS_TASKS,
- FALLOW_SLOTS_MILLIS_TASKS,
- TOTAL_LAUNCHED_UBERTASKS,
- NUM_UBER_SUBTASKS,
- NUM_FAILED_UBERTASKS
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
deleted file mode 100644
index 08f4c5d..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-public enum FileSystemCounter {
- BYTES_READ,
- BYTES_WRITTEN,
- READ_OPS,
- LARGE_READ_OPS,
- WRITE_OPS,
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java b/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
deleted file mode 100644
index d4b167a..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.Iterator;
-import java.util.Locale;
-import java.util.Map;
-
-import com.google.common.base.Joiner;
-import static com.google.common.base.Preconditions.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * An abstract class to provide common implementation of the filesystem
- * counter group in both mapred and mapreduce packages.
- *
- * @param <C> the type of the Counter for the group
- */
-@InterfaceAudience.Private
-public abstract class FileSystemCounterGroup<C extends TezCounter>
- implements CounterGroupBase<C> {
-
- static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
- static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
-
- // C[] would need Array.newInstance which requires a Class<C> reference.
- // Just a few local casts probably worth not having to carry it around.
- private final Map<String, Object[]> map =
- new ConcurrentSkipListMap<String, Object[]>();
- private String displayName;
-
- private static final Joiner NAME_JOINER = Joiner.on('_');
- private static final Joiner DISP_JOINER = Joiner.on(": ");
-
- @InterfaceAudience.Private
- public static class FSCounter extends AbstractCounter {
- final String scheme;
- final FileSystemCounter key;
- private long value;
-
- public FSCounter(String scheme, FileSystemCounter ref) {
- this.scheme = scheme;
- key = ref;
- }
-
- @Override
- public String getName() {
- return NAME_JOINER.join(scheme, key.name());
- }
-
- @Override
- public String getDisplayName() {
- return DISP_JOINER.join(scheme, localizeCounterName(key.name()));
- }
-
- protected String localizeCounterName(String counterName) {
- return ResourceBundles.getCounterName(FileSystemCounter.class.getName(),
- counterName, counterName);
- }
-
- @Override
- public long getValue() {
- return value;
- }
-
- @Override
- public void setValue(long value) {
- this.value = value;
- }
-
- @Override
- public void increment(long incr) {
- value += incr;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- assert false : "shouldn't be called";
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- assert false : "shouldn't be called";
- }
-
- @Override
- public TezCounter getUnderlyingCounter() {
- return this;
- }
- }
-
- @Override
- public String getName() {
- return FileSystemCounter.class.getName();
- }
-
- @Override
- public String getDisplayName() {
- if (displayName == null) {
- displayName = ResourceBundles.getCounterGroupName(getName(),
- "File System Counters");
- }
- return displayName;
- }
-
- @Override
- public void setDisplayName(String displayName) {
- this.displayName = displayName;
- }
-
- @Override
- public void addCounter(C counter) {
- C ours;
- if (counter instanceof FileSystemCounterGroup.FSCounter) {
- FSCounter c = (FSCounter) counter;
- ours = findCounter(c.scheme, c.key);
- }
- else {
- ours = findCounter(counter.getName());
- }
- ours.setValue(counter.getValue());
- }
-
- @Override
- public C addCounter(String name, String displayName, long value) {
- C counter = findCounter(name);
- counter.setValue(value);
- return counter;
- }
-
- // Parse generic counter name into [scheme, key]
- private String[] parseCounterName(String counterName) {
- int schemeEnd = counterName.indexOf('_');
- if (schemeEnd < 0) {
- throw new IllegalArgumentException("bad fs counter name");
- }
- return new String[]{counterName.substring(0, schemeEnd),
- counterName.substring(schemeEnd + 1)};
- }
-
- @Override
- public C findCounter(String counterName, String displayName) {
- return findCounter(counterName);
- }
-
- @Override
- public C findCounter(String counterName, boolean create) {
- try {
- String[] pair = parseCounterName(counterName);
- return findCounter(pair[0], FileSystemCounter.valueOf(pair[1]));
- }
- catch (Exception e) {
- if (create) throw new IllegalArgumentException(e);
- return null;
- }
- }
-
- @Override
- public C findCounter(String counterName) {
- return findCounter(counterName, true);
- }
-
- @SuppressWarnings("unchecked")
- public synchronized C findCounter(String scheme, FileSystemCounter key) {
- final String canonicalScheme = checkScheme(scheme);
- Object[] counters = map.get(canonicalScheme);
- int ord = key.ordinal();
- if (counters == null) {
- counters = new Object[FileSystemCounter.values().length];
- map.put(canonicalScheme, counters);
- counters[ord] = newCounter(canonicalScheme, key);
- }
- else if (counters[ord] == null) {
- counters[ord] = newCounter(canonicalScheme, key);
- }
- return (C) counters[ord];
- }
-
- private String checkScheme(String scheme) {
- String fixed = scheme.toUpperCase(Locale.US);
- String interned = schemes.putIfAbsent(fixed, fixed);
- if (schemes.size() > MAX_NUM_SCHEMES) {
- // mistakes or abuses
- throw new IllegalArgumentException("too many schemes? "+ schemes.size() +
- " when process scheme: "+ scheme);
- }
- return interned == null ? fixed : interned;
- }
-
- /**
- * Abstract factory method to create a file system counter
- * @param scheme of the file system
- * @param key the enum of the file system counter
- * @return a new file system counter
- */
- protected abstract C newCounter(String scheme, FileSystemCounter key);
-
- @Override
- public int size() {
- int n = 0;
- for (Object[] counters : map.values()) {
- n += numSetCounters(counters);
- }
- return n;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void incrAllCounters(CounterGroupBase<C> other) {
- if (checkNotNull(other.getUnderlyingGroup(), "other group")
- instanceof FileSystemCounterGroup<?>) {
- for (TezCounter counter : other) {
- FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
- findCounter(c.scheme, c.key) .increment(counter.getValue());
- }
- }
- }
-
- /**
- * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
- */
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, map.size()); // #scheme
- for (Map.Entry<String, Object[]> entry : map.entrySet()) {
- WritableUtils.writeString(out, entry.getKey()); // scheme
- // #counter for the above scheme
- WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
- for (Object counter : entry.getValue()) {
- if (counter == null) continue;
- @SuppressWarnings("unchecked")
- FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
- WritableUtils.writeVInt(out, c.key.ordinal()); // key
- WritableUtils.writeVLong(out, c.getValue()); // value
- }
- }
- }
-
- private int numSetCounters(Object[] counters) {
- int n = 0;
- for (Object counter : counters) if (counter != null) ++n;
- return n;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int numSchemes = WritableUtils.readVInt(in); // #scheme
- FileSystemCounter[] enums = FileSystemCounter.values();
- for (int i = 0; i < numSchemes; ++i) {
- String scheme = WritableUtils.readString(in); // scheme
- int numCounters = WritableUtils.readVInt(in); // #counter
- for (int j = 0; j < numCounters; ++j) {
- findCounter(scheme, enums[WritableUtils.readVInt(in)]) // key
- .setValue(WritableUtils.readVLong(in)); // value
- }
- }
- }
-
- @Override
- public Iterator<C> iterator() {
- return new AbstractIterator<C>() {
- Iterator<Object[]> it = map.values().iterator();
- Object[] counters = it.hasNext() ? it.next() : null;
- int i = 0;
- @Override
- protected C computeNext() {
- while (counters != null) {
- while (i < counters.length) {
- @SuppressWarnings("unchecked")
- C counter = (C) counters[i++];
- if (counter != null) return counter;
- }
- i = 0;
- counters = it.hasNext() ? it.next() : null;
- }
- return endOfData();
- }
- };
- }
-
- @Override
- public synchronized boolean equals(Object genericRight) {
- if (genericRight instanceof CounterGroupBase<?>) {
- @SuppressWarnings("unchecked")
- CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
- return Iterators.elementsEqual(iterator(), right.iterator());
- }
- return false;
- }
-
- @Override
- public synchronized int hashCode() {
- // need to be deep as counters is an array
- int hash = FileSystemCounter.class.hashCode();
- for (Object[] counters : map.values()) {
- if (counters != null) hash ^= Arrays.hashCode(counters);
- }
- return hash;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java b/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
deleted file mode 100644
index 42fb636..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common.counters;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
-/**
- * An abstract class to provide common implementation for the framework
- * counter group in both mapred and mapreduce packages.
- *
- * @param <T> type of the counter enum class
- * @param <C> type of the counter
- */
-@InterfaceAudience.Private
-public abstract class FrameworkCounterGroup<T extends Enum<T>,
- C extends TezCounter> implements CounterGroupBase<C> {
- private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
-
- private final Class<T> enumClass; // for Enum.valueOf
- private final Object[] counters; // local casts are OK and save a class ref
- private String displayName = null;
-
- /**
- * A counter facade for framework counters.
- * Use old (which extends new) interface to make compatibility easier.
- */
- @InterfaceAudience.Private
- public static class FrameworkCounter<T extends Enum<T>> extends AbstractCounter {
- final T key;
- final String groupName;
- private long value;
-
- public FrameworkCounter(T ref, String groupName) {
- key = ref;
- this.groupName = groupName;
- }
-
- @Override
- public String getName() {
- return key.name();
- }
-
- @Override
- public String getDisplayName() {
- return ResourceBundles.getCounterName(groupName, getName(), getName());
- }
-
- @Override
- public long getValue() {
- return value;
- }
-
- @Override
- public void setValue(long value) {
- this.value = value;
- }
-
- @Override
- public void increment(long incr) {
- value += incr;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- assert false : "shouldn't be called";
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- assert false : "shouldn't be called";
- }
-
- @Override
- public TezCounter getUnderlyingCounter() {
- return this;
- }
- }
-
- @SuppressWarnings("unchecked")
- public FrameworkCounterGroup(Class<T> enumClass) {
- this.enumClass = enumClass;
- T[] enums = enumClass.getEnumConstants();
- counters = new Object[enums.length];
- }
-
- @Override
- public String getName() {
- return enumClass.getName();
- }
-
- @Override
- public String getDisplayName() {
- if (displayName == null) {
- displayName = ResourceBundles.getCounterGroupName(getName(), getName());
- }
- return displayName;
- }
-
- @Override
- public void setDisplayName(String displayName) {
- this.displayName = displayName;
- }
-
- private T valueOf(String name) {
- return Enum.valueOf(enumClass, name);
- }
-
- @Override
- public void addCounter(C counter) {
- C ours = findCounter(counter.getName());
- ours.setValue(counter.getValue());
- }
-
- @Override
- public C addCounter(String name, String displayName, long value) {
- C counter = findCounter(name);
- counter.setValue(value);
- return counter;
- }
-
- @Override
- public C findCounter(String counterName, String displayName) {
- return findCounter(counterName);
- }
-
- @Override
- public C findCounter(String counterName, boolean create) {
- try {
- return findCounter(valueOf(counterName));
- }
- catch (Exception e) {
- if (create) throw new IllegalArgumentException(e);
- return null;
- }
- }
-
- @Override
- public C findCounter(String counterName) {
- return findCounter(valueOf(counterName));
- }
-
- @SuppressWarnings("unchecked")
- private C findCounter(T key) {
- int i = key.ordinal();
- if (counters[i] == null) {
- counters[i] = newCounter(key);
- }
- return (C) counters[i];
- }
-
- /**
- * Abstract factory method for new framework counter
- * @param key for the enum value of a counter
- * @return a new counter for the key
- */
- protected abstract C newCounter(T key);
-
- @Override
- public int size() {
- int n = 0;
- for (int i = 0; i < counters.length; ++i) {
- if (counters[i] != null) ++n;
- }
- return n;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void incrAllCounters(CounterGroupBase<C> other) {
- if (checkNotNull(other, "other counter group")
- instanceof FrameworkCounterGroup<?, ?>) {
- for (TezCounter counter : other) {
- findCounter(((FrameworkCounter) counter).key.name())
- .increment(counter.getValue());
- }
- }
- }
-
- /**
- * FrameworkGroup ::= #counter (key value)*
- */
- @Override
- @SuppressWarnings("unchecked")
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, size());
- for (int i = 0; i < counters.length; ++i) {
- TezCounter counter = (C) counters[i];
- if (counter != null) {
- WritableUtils.writeVInt(out, i);
- WritableUtils.writeVLong(out, counter.getValue());
- }
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- clear();
- int len = WritableUtils.readVInt(in);
- T[] enums = enumClass.getEnumConstants();
- for (int i = 0; i < len; ++i) {
- int ord = WritableUtils.readVInt(in);
- TezCounter counter = newCounter(enums[ord]);
- counter.setValue(WritableUtils.readVLong(in));
- counters[ord] = counter;
- }
- }
-
- private void clear() {
- for (int i = 0; i < counters.length; ++i) {
- counters[i] = null;
- }
- }
-
- @Override
- public Iterator<C> iterator() {
- return new AbstractIterator<C>() {
- int i = 0;
- @Override
- protected C computeNext() {
- while (i < counters.length) {
- @SuppressWarnings("unchecked")
- C counter = (C) counters[i++];
- if (counter != null) return counter;
- }
- return endOfData();
- }
- };
- }
-
- @Override
- public boolean equals(Object genericRight) {
- if (genericRight instanceof CounterGroupBase<?>) {
- @SuppressWarnings("unchecked")
- CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
- return Iterators.elementsEqual(iterator(), right.iterator());
- }
- return false;
- }
-
- @Override
- public synchronized int hashCode() {
- // need to be deep as counters is an array
- return Arrays.deepHashCode(new Object[]{enumClass, counters, displayName});
- }
-}