You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [6/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ te...

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+
+/**
+ * Meant for user configurable job properties. For others look at {@link Constants}
+ *
+ */
+
+// TODO EVENTUALLY A description for each property.
+@Private
+@Evolving
+public class TezJobConfig {
+
+
+
+
+  /** The number of milliseconds between progress reports. */
+  public static final int PROGRESS_INTERVAL = 3000;
+
+  public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String TEZ_ENGINE_IFILE_READAHEAD =
+      "tez.engine.ifile.readahead";
+  public static final boolean DEFAULT_TEZ_ENGINE_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+      "tez.engine.ifile.readahead.bytes";
+  public static final int DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+      4 * 1024 * 1024;
+
+  /**
+   * 
+   */
+  public static final String RECORDS_BEFORE_PROGRESS = 
+      "tez.task.merge.progress.records";
+  public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000; 
+
+  /**
+   * 
+   */
+  public static final String LOCAL_DIR = "tez.engine.local.dir";
+  public static final String DEFAULT_LOCAL_DIR = "/tmp";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_TASK_INDEGREE = 
+      "tez.engine.task.in-degree";
+  public static final int DEFAULT_TEZ_ENGINE_TASK_INDEGREE = 1;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_TASK_OUTDEGREE = 
+      "tez.engine.task.out-degree";
+  public static final int DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE = 1;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_IO_SORT_FACTOR = 
+      "tez.engine.io.sort.factor";
+  public static final int DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR = 100;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SORT_SPILL_PERCENT = 
+      "tez.engine.sort.spill.percent";
+  public static float DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT = 0.8f; 
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_IO_SORT_MB = "tez.engine.io.sort.mb";
+  public static final int DEFAULT_TEZ_ENGINE_IO_SORT_MB = 100;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES = 
+      "tez.engine.index.cache.memory.limit.bytes";
+  public static final int DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES = 
+      1024 * 1024;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_COMBINE_MIN_SPILLS = 
+      "tez.engine.combine.min.spills";
+  public static final int  DEFAULT_TEZ_ENGINE_COMBINE_MIN_SPILLS = 3;
+  
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SORT_THREADS = 
+	      "tez.engine.sort.threads";
+  public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
+
+  /**
+   * 
+   */
+  public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "tez.engine.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_NAME_MAX_KEY = "tez.engine.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_GROUPS_MAX_KEY = "tez.engine.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+
+  
+  /**
+   * Temporary interface for MR only (not chained Tez) to indicate whether
+   * in-memory shuffle should be used.
+   */
+  @Private
+  public static final String TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY =
+      "tez.engine.shuffle.use.in-memory";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
+  
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 
+      "tez.engine.shuffle.parallel.copies";
+  public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 20;
+
+  /**
+   * TODO Is this user configurable.
+   */
+  public static final String TEZ_ENGINE_METRICS_SESSION_ID = 
+      "tez.engine.metrics.session.id";
+  public static final String DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID = "";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_FETCH_FAILURES = 
+      "tez.engine.shuffle.fetch.failures.limit";
+  public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = 
+      "tez.engine.shuffle.notify.readerror";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = true;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT = 
+      "tez.engine.shuffle.connect.timeout";
+  public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT = 
+      3 * 60 * 1000;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = "tez.engine.shuffle.read.timeout";
+  public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = 
+      3 * 60 * 1000;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_ENABLE_SSL = 
+      "tez.engine.shuffle.ssl.enable";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL = false;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT = 
+      "tez.engine.shuffle.input.buffer.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
+      0.90f;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT = 
+      "tez.engine.shuffle.memory.limit.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT = 
+      0.25f;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 
+      "tez.engine.shuffle.merge.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 0.90f;
+  
+  /**
+   * TODO TEZAM3 default value ?
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS = 
+      "tez.engine.shuffle.memory-to-memory.segments";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM = 
+      "tez.engine.shuffle.memory-to-memory.enable";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM = 
+      false;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT = 
+      "tez.engine.task.input.buffer.percent";
+  public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public abstract class TezTask implements Writable {
+
+  // Serialized Fields
+  private TezTaskAttemptID taskAttemptId;
+  private String user;
+  private String jobName;
+  private String vertexName;
+  
+
+
+  protected SecretKey jobTokenSecret;
+  protected TezTaskReporter reporter;
+  protected Partitioner partitioner;
+  protected Processor combineProcessor;
+  protected TezTaskStatus status;
+  protected Progress progress = new Progress();
+
+  public TezTask() {
+  }
+
+  public TezTask(TezTaskAttemptID taskAttemptID, String user, String jobName,
+      String vertexName) {
+    this.taskAttemptId = taskAttemptID;
+    this.user = user;
+    this.jobName = jobName;
+    this.vertexName = vertexName;
+  }
+
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public Progress getProgress() {
+    return progress;
+  }
+
+  public TezDAGID getDAGID() {
+    return taskAttemptId.getTaskID().getVertexID().getDAGId();
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+  
+  public String getVertexName() {
+    return this.vertexName;
+  }
+
+  public SecretKey getJobTokenSecret() {
+    return jobTokenSecret;
+  }
+
+  public void setJobTokenSecret(SecretKey jobTokenSecret) {
+    this.jobTokenSecret = jobTokenSecret;
+  }
+
+  public TezTaskStatus getStatus() {
+    return status;
+  }
+
+  public TezTaskReporter getTaskReporter() {
+    return reporter;
+  }
+
+  public Processor getCombineProcessor() {
+    return combineProcessor;
+  }
+
+  public Partitioner getPartitioner() {
+    return partitioner;
+  }
+
+  
+
+  public void statusUpdate() throws IOException, InterruptedException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    Text.writeString(out, user);
+    Text.writeString(out, jobName);
+    Text.writeString(out, vertexName);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = TezTaskAttemptID.read(in);
+    user = Text.readString(in);
+    jobName = Text.readString(in);
+    vertexName = Text.readString(in);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskReporter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskReporter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskReporter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskReporter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,116 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+
+public interface TezTaskReporter extends Progressable, Master {
+
+  public void setStatus(String status);
+
+  public float getProgress();
+
+  public void setProgress(float progress);
+  
+  public void progress();
+
+  public TezCounter getCounter(String group, String name);
+
+  public TezCounter getCounter(Enum<?> name);
+
+  public void incrCounter(String group, String counter, long amount);
+
+  public void incrCounter(Enum<?> key, long amount);
+
+  public void reportFatalError(TezTaskAttemptID taskAttemptId, 
+      Throwable exception, String logMsg);
+
+  public final TezTaskReporter NULL = new TezTaskReporter() {
+
+    @Override
+    public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+        int fromEventIdx, int maxEventsToFetch,
+        TezTaskAttemptID reduce) {
+      return null;
+    }
+    
+    @Override
+    public void setStatus(String status) {
+    }
+    
+    @Override
+    public void setProgress(float progress) {
+    }
+    
+    @Override
+    public void progress() {
+    }
+    
+    @Override
+    public void incrCounter(Enum<?> key, long amount) {
+    }
+    
+    @Override
+    public void incrCounter(String group, String counter, long amount) {
+    }
+    
+    @Override
+    public float getProgress() {
+      return 0.0f;
+    }
+    
+    @Override
+    public TezCounter getCounter(Enum<?> name) {
+      return null;
+    }
+    
+    @Override
+    public TezCounter getCounter(String group, String name) {
+      return null;
+    }
+
+    @Override
+    public void reportFatalError(TezTaskAttemptID taskAttemptId,
+        Throwable exception, String logMsg) {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      // TODO TEZAM3
+      return 0;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      // TODO TEZAM3
+      return null;
+    }
+  };
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskReporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public interface TezTaskStatus extends Writable {
+
+  //enumeration for reporting current phase of a task.
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+
+  // what state is the task in?
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+
+  public abstract TezTaskAttemptID getTaskAttemptId();
+
+  public abstract float getProgress();
+
+  public abstract void setProgress(float progress);
+
+  public abstract State getRunState();
+
+  public abstract void setRunState(State runState);
+
+  public abstract String getDiagnosticInfo();
+
+  public abstract void setDiagnosticInfo(String info);
+
+  // TODOTEZDAG Remove stateString / rename
+  public abstract String getStateString();
+
+  public abstract void setStateString(String stateString);
+
+  public abstract long getFinishTime();
+
+  public abstract void setFinishTime(long finishTime);
+  
+  // TODOTEZDAG Can shuffle / merge be made generic ? Otherwise just a single finish time.
+  public abstract long getShuffleFinishTime();
+
+  public abstract void setShuffleFinishTime(long shuffleFinishTime);
+  
+  public abstract long getMapFinishTime();
+
+  public abstract void setMapFinishTime(long mapFinishTime);
+  
+  public abstract long getSortFinishTime();
+  
+  public abstract void setSortFinishTime(long sortFinishTime);
+  
+  public abstract long getStartTime();
+  
+  public abstract void setStartTime(long startTime);
+
+  // TODOTEZDAG Remove phase
+  public abstract Phase getPhase();
+
+  public abstract void setPhase(Phase phase);
+
+  public abstract TezCounters getCounters();
+
+  public abstract void setCounters(TezCounters counters);
+
+  public abstract List<TezTaskAttemptID> getFailedDependencies();
+
+  public abstract void addFailedDependency(TezTaskAttemptID taskAttempttId);
+
+  public abstract void clearStatus();
+
+  public abstract void statusUpdate(float f, String string, TezCounters counters);
+
+  // TODOTEZDAG maybe remove ?
+  public abstract long getLocalOutputSize();
+
+  public abstract void setOutputSize(long l);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An abstract counter class to provide common implementation of
+ * the counter interface in both mapred and mapreduce packages.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounter implements TezCounter {
+
+  @Deprecated
+  @Override
+  public void setDisplayName(String name) {}
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof TezCounter) {
+      synchronized (genericRight) {
+        TezCounter right = (TezCounter) genericRight;
+        return getName().equals(right.getName()) &&
+               getDisplayName().equals(right.getDisplayName()) &&
+               getValue() == right.getValue();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return Objects.hashCode(getName(), getDisplayName(), getValue());
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * An abstract class to provide common implementation of the
+ * generic counter group in both mapred and mapreduce package.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounterGroup<T extends TezCounter>
+    implements CounterGroupBase<T> {
+
+  private final String name;
+  private String displayName;
+  private final ConcurrentMap<String, T> counters =
+      new ConcurrentSkipListMap<String, T>();
+  private final Limits limits;
+
+  public AbstractCounterGroup(String name, String displayName,
+                              Limits limits) {
+    this.name = name;
+    this.displayName = displayName;
+    this.limits = limits;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  @Override
+  public synchronized void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public synchronized void addCounter(T counter) {
+    counters.put(counter.getName(), counter);
+    limits.incrCounters();
+  }
+
+  @Override
+  public synchronized T addCounter(String counterName, String displayName,
+                                   long value) {
+    String saveName = Limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, value);
+    }
+    counter.setValue(value);
+    return counter;
+  }
+
+  private T addCounterImpl(String name, String displayName, long value) {
+    T counter = newCounter(name, displayName, value);
+    addCounter(counter);
+    return counter;
+  }
+
+  @Override
+  public synchronized T findCounter(String counterName, String displayName) {
+    // Take lock to avoid two threads not finding a counter and trying to add
+    // the same counter.
+    String saveName = Limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName, boolean create) {
+    return findCounterImpl(Limits.filterCounterName(counterName), create);
+  }
+
+  // Lock the object. Cannot simply use concurrent constructs on the counters
+  // data-structure (like putIfAbsent) because of localization, limits etc.
+  private synchronized T findCounterImpl(String counterName, boolean create) {
+    T counter = counters.get(counterName);
+    if (counter == null && create) {
+      String localized =
+          ResourceBundles.getCounterName(getName(), counterName, counterName);
+      return addCounterImpl(counterName, localized, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName) {
+    return findCounter(counterName, true);
+  }
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @param counterName of the counter
+   * @param displayName of the counter
+   * @param value of the counter
+   * @return a new counter
+   */
+  protected abstract T newCounter(String counterName, String displayName,
+                                  long value);
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @return a new counter object
+   */
+  protected abstract T newCounter();
+
+  @Override
+  public Iterator<T> iterator() {
+    return counters.values().iterator();
+  }
+
+  /**
+   * GenericGroup ::= displayName #counter counter*
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, displayName);
+    WritableUtils.writeVInt(out, counters.size());
+    for(TezCounter counter: counters.values()) {
+      counter.write(out);
+    }
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    displayName = Text.readString(in);
+    counters.clear();
+    int size = WritableUtils.readVInt(in);
+    for (int i = 0; i < size; i++) {
+      T counter = newCounter();
+      counter.readFields(in);
+      counters.put(counter.getName(), counter);
+      limits.incrCounters();
+    }
+  }
+
+  @Override
+  public synchronized int size() {
+    return counters.size();
+  }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return counters.hashCode();
+  }
+
+  @Override
+  public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+    try {
+      for (TezCounter right : rightGroup) {
+        TezCounter left = findCounter(right.getName(), right.getDisplayName());
+        left.increment(right.getValue());
+      }
+    } catch (LimitExceededException e) {
+      counters.clear();
+      throw e;
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import static org.apache.tez.common.counters.CounterGroupFactory.getFrameworkGroupId;
+import static org.apache.tez.common.counters.CounterGroupFactory.isFrameworkGroup;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+/**
+ * An abstract class to provide common implementation for the Counters
+ * container in both mapred and mapreduce packages.
+ *
+ * @param <C> type of counter inside the counters
+ * @param <G> type of group inside the counters
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class AbstractCounters<C extends TezCounter,
+                                       G extends CounterGroupBase<C>>
+    implements Writable, Iterable<G> {
+
+  protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
+
+  /**
+   * A cache from enum values to the associated counter.
+   */
+  private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
+  //framework & fs groups
+  private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
+  // other groups
+  private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
+  private final CounterGroupFactory<C, G> groupFactory;
+
+  // For framework counter serialization without strings
+  enum GroupType { FRAMEWORK, FILESYSTEM };
+
+  // Writes only framework and fs counters if false.
+  private boolean writeAllCounters = true;
+
+  private static final Map<String, String> legacyMap = Maps.newHashMap();
+  static {
+    legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
+                  TaskCounter.class.getName());
+    legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
+                  JobCounter.class.getName());
+    legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName());
+  }
+
+  private final Limits limits = new Limits();
+
+  @InterfaceAudience.Private
+  public AbstractCounters(CounterGroupFactory<C, G> gf) {
+    groupFactory = gf;
+  }
+
+  /**
+   * Construct from another counters object.
+   * @param <C1> type of the other counter
+   * @param <G1> type of the other counter group
+   * @param counters the counters object to copy
+   * @param groupFactory the factory for new groups
+   */
+  @InterfaceAudience.Private
+  public <C1 extends TezCounter, G1 extends CounterGroupBase<C1>>
+  AbstractCounters(AbstractCounters<C1, G1> counters,
+                   CounterGroupFactory<C, G> groupFactory) {
+    this.groupFactory = groupFactory;
+    for(G1 group: counters) {
+      String name = group.getName();
+      G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
+      (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
+      for(TezCounter counter: group) {
+        newGroup.addCounter(counter.getName(), counter.getDisplayName(),
+                            counter.getValue());
+      }
+    }
+  }
+
+  /** Add a group.
+   * @param group object to add
+   * @return the group
+   */
+  @InterfaceAudience.Private
+  public synchronized G addGroup(G group) {
+    String name = group.getName();
+    if (isFrameworkGroup(name)) {
+      fgroups.put(name, group);
+    } else {
+      limits.checkGroups(groups.size() + 1);
+      groups.put(name, group);
+    }
+    return group;
+  }
+
+  /**
+   * Add a new group
+   * @param name of the group
+   * @param displayName of the group
+   * @return the group
+   */
+  @InterfaceAudience.Private
+  public G addGroup(String name, String displayName) {
+    return addGroup(groupFactory.newGroup(name, displayName, limits));
+  }
+
+  /**
+   * Find a counter, create one if necessary
+   * @param groupName of the counter
+   * @param counterName name of the counter
+   * @return the matching counter
+   */
+  public C findCounter(String groupName, String counterName) {
+    G grp = getGroup(groupName);
+    return grp.findCounter(counterName);
+  }
+
+  /**
+   * Find the counter for the given enum. The same enum will always return the
+   * same counter.
+   * @param key the counter key
+   * @return the matching counter object
+   */
+  public synchronized C findCounter(Enum<?> key) {
+    C counter = cache.get(key);
+    if (counter == null) {
+      counter = findCounter(key.getDeclaringClass().getName(), key.name());
+      cache.put(key, counter);
+    }
+    return counter;
+  }
+
+  /**
+   * Find the file system counter for the given scheme and enum.
+   * @param scheme of the file system
+   * @param key the enum of the counter
+   * @return the file system counter
+   */
+  @InterfaceAudience.Private
+  public synchronized C findCounter(String scheme, FileSystemCounter key) {
+    return ((FileSystemCounterGroup<C>) getGroup(
+        FileSystemCounter.class.getName()).getUnderlyingGroup()).
+        findCounter(scheme, key);
+  }
+
+  /**
+   * Returns the names of all counter classes.
+   * @return Set of counter names.
+   */
+  public synchronized Iterable<String> getGroupNames() {
+    HashSet<String> deprecated = new HashSet<String>();
+    for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
+      String newGroup = entry.getValue();
+      boolean isFGroup = isFrameworkGroup(newGroup);
+      if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
+        deprecated.add(entry.getKey());
+      }
+    }
+    return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);
+  }
+
+  @Override
+  public Iterator<G> iterator() {
+    return Iterators.concat(fgroups.values().iterator(),
+                            groups.values().iterator());
+  }
+
+  /**
+   * Returns the named counter group, or an empty group if there is none
+   * with the specified name.
+   * @param groupName name of the group
+   * @return the group
+   */
+  public synchronized G getGroup(String groupName) {
+
+    // filterGroupName
+    boolean groupNameInLegacyMap = true;
+    String newGroupName = legacyMap.get(groupName);
+    if (newGroupName == null) {
+      groupNameInLegacyMap = false;
+      newGroupName = Limits.filterGroupName(groupName);
+    }
+
+    boolean isFGroup = isFrameworkGroup(newGroupName);
+    G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName);
+    if (group == null) {
+      group = groupFactory.newGroup(newGroupName, limits);
+      if (isFGroup) {
+        fgroups.put(newGroupName, group);
+      } else {
+        limits.checkGroups(groups.size() + 1);
+        groups.put(newGroupName, group);
+      }
+      if (groupNameInLegacyMap) {
+        LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName
+            + " instead");
+      }
+    }
+    return group;
+  }
+
+  /**
+   * Returns the total number of counters, by summing the number of counters
+   * in each group.
+   * @return the total number of counters
+   */
+  public synchronized int countCounters() {
+    int result = 0;
+    for (G group : this) {
+      result += group.size();
+    }
+    return result;
+  }
+
+  /**
+   * Write the set of groups.
+   * Counters ::= version #fgroups (groupId, group)* #groups (group)*
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, groupFactory.version());
+    WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
+    for (G group : fgroups.values()) {
+      if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
+        WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
+        WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
+        group.write(out);
+      } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
+        WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
+        group.write(out);
+      }
+    }
+    if (writeAllCounters) {
+      WritableUtils.writeVInt(out, groups.size());
+      for (G group : groups.values()) {
+        Text.writeString(out, group.getName());
+        group.write(out);
+      }
+    } else {
+      WritableUtils.writeVInt(out, 0);
+    }
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    int version = WritableUtils.readVInt(in);
+    if (version != groupFactory.version()) {
+      throw new IOException("Counters version mismatch, expected "+
+          groupFactory.version() +" got "+ version);
+    }
+    int numFGroups = WritableUtils.readVInt(in);
+    fgroups.clear();
+    GroupType[] groupTypes = GroupType.values();
+    while (numFGroups-- > 0) {
+      GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
+      G group;
+      switch (groupType) {
+        case FILESYSTEM: // with nothing
+          group = groupFactory.newFileSystemGroup();
+          break;
+        case FRAMEWORK:  // with group id
+          group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
+          break;
+        default: // Silence dumb compiler, as it would've thrown earlier
+          throw new IOException("Unexpected counter group type: "+ groupType);
+      }
+      group.readFields(in);
+      fgroups.put(group.getName(), group);
+    }
+    int numGroups = WritableUtils.readVInt(in);
+    while (numGroups-- > 0) {
+      limits.checkGroups(groups.size() + 1);
+      G group = groupFactory.newGenericGroup(Text.readString(in), null, limits);
+      group.readFields(in);
+      groups.put(group.getName(), group);
+    }
+  }
+
+  /**
+   * Return textual representation of the counter values.
+   * @return the string
+   */
+  @Override
+  public synchronized String toString() {
+    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
+    for (G group: this) {
+      sb.append("\n\t").append(group.getDisplayName());
+      for (TezCounter counter: group) {
+        sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
+          .append(counter.getValue());
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Increments multiple counters by their amounts in another Counters
+   * instance.
+   * @param other the other Counters instance
+   */
+  public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
+    for(G right : other) {
+      String groupName = right.getName();
+      G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
+      if (left == null) {
+        left = addGroup(groupName, right.getDisplayName());
+      }
+      left.incrAllCounters(right);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean equals(Object genericRight) {
+    if (genericRight instanceof AbstractCounters<?, ?>) {
+      return Iterators.elementsEqual(iterator(),
+          ((AbstractCounters<C, G>)genericRight).iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return groups.hashCode();
+  }
+
+  /**
+   * Set the "writeAllCounters" option to true or false
+   * @param send  if true all counters would be serialized, otherwise only
+   *              framework counters would be serialized in
+   *              {@link #write(DataOutput)}
+   */
+  @InterfaceAudience.Private
+  public void setWriteAllCounters(boolean send) {
+    writeAllCounters = send;
+  }
+
+  /**
+   * Get the "writeAllCounters" option
+   * @return true of all counters would serialized
+   */
+  @InterfaceAudience.Private
+  public boolean getWriteAllCounters() {
+    return writeAllCounters;
+  }
+
+  @InterfaceAudience.Private
+  public Limits limits() {
+    return limits;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A group of {@link TezCounter}s that logically belong together. Typically,
+ * it is an {@link Enum} subclass and the counters are the values.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface CounterGroup extends CounterGroupBase<TezCounter> {
+  // essentially a typedef so user doesn't have to use generic syntax
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroup.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The common counter group interface.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CounterGroupBase<T extends TezCounter>
+    extends Writable, Iterable<T> {
+
+  /**
+   * Get the internal name of the group
+   * @return the internal name
+   */
+  String getName();
+
+  /**
+   * Get the display name of the group.
+   * @return the human readable name
+   */
+  String getDisplayName();
+
+  /**
+   * Set the display name of the group
+   * @param displayName of the group
+   */
+  void setDisplayName(String displayName);
+
+  /** Add a counter to this group.
+   * @param counter to add
+   */
+  void addCounter(T counter);
+
+  /**
+   * Add a counter to this group
+   * @param name  of the counter
+   * @param displayName of the counter
+   * @param value of the counter
+   * @return the counter
+   */
+  T addCounter(String name, String displayName, long value);
+
+  /**
+   * Find a counter in the group.
+   * @param counterName the name of the counter
+   * @param displayName the display name of the counter
+   * @return the counter that was found or added
+   */
+  T findCounter(String counterName, String displayName);
+
+  /**
+   * Find a counter in the group
+   * @param counterName the name of the counter
+   * @param create create the counter if not found if true
+   * @return the counter that was found or added or null if create is false
+   */
+  T findCounter(String counterName, boolean create);
+
+  /**
+   * Find a counter in the group.
+   * @param counterName the name of the counter
+   * @return the counter that was found or added
+   */
+  T findCounter(String counterName);
+
+  /**
+   * @return the number of counters in this group.
+   */
+  int size();
+
+  /**
+   * Increment all counters by a group of counters
+   * @param rightGroup  the group to be added to this group
+   */
+  void incrAllCounters(CounterGroupBase<T> rightGroup);
+  
+  @Private
+  /**
+   * Exposes the underlying group type if a facade.
+   * @return the underlying object that this object is wrapping up.
+   */
+  CounterGroupBase<T> getUnderlyingGroup();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An abstract class to provide common implementation of the
+ * group factory in both mapred and mapreduce packages.
+ *
+ * @param <C> type of the counter
+ * @param <G> type of the group
+ */
+@InterfaceAudience.Private
+public abstract class CounterGroupFactory<C extends TezCounter,
+                                          G extends CounterGroupBase<C>> {
+
+  public interface FrameworkGroupFactory<F> {
+    F newGroup(String name);
+  }
+
+  // Integer mapping (for serialization) for framework groups
+  private static final Map<String, Integer> s2i = Maps.newHashMap();
+  private static final List<String> i2s = Lists.newArrayList();
+  private static final int VERSION = 1;
+  private static final String FS_GROUP_NAME = FileSystemCounter.class.getName();
+
+  private final Map<String, FrameworkGroupFactory<G>> fmap = Maps.newHashMap();
+  {
+    // Add builtin counter class here and the version when changed.
+    addFrameworkGroup(TaskCounter.class);
+    addFrameworkGroup(JobCounter.class);
+    addFrameworkGroup(DAGCounter.class);
+  }
+
+  // Initialize the framework counter group mapping
+  private synchronized <T extends Enum<T>>
+  void addFrameworkGroup(final Class<T> cls) {
+    updateFrameworkGroupMapping(cls);
+    fmap.put(cls.getName(), newFrameworkGroupFactory(cls));
+  }
+
+  // Update static mappings (c2i, i2s) of framework groups
+  private static synchronized void updateFrameworkGroupMapping(Class<?> cls) {
+    String name = cls.getName();
+    Integer i = s2i.get(name);
+    if (i != null) return;
+    i2s.add(name);
+    s2i.put(name, i2s.size() - 1);
+  }
+
+  /**
+   * Required override to return a new framework group factory
+   * @param <T> type of the counter enum class
+   * @param cls the counter enum class
+   * @return a new framework group factory
+   */
+  protected abstract <T extends Enum<T>>
+  FrameworkGroupFactory<G> newFrameworkGroupFactory(Class<T> cls);
+
+  /**
+   * Create a new counter group
+   * @param name of the group
+   * @param limits the counters limits policy object
+   * @return a new counter group
+   */
+  public G newGroup(String name, Limits limits) {
+    return newGroup(name, ResourceBundles.getCounterGroupName(name, name),
+                    limits);
+  }
+
+  /**
+   * Create a new counter group
+   * @param name of the group
+   * @param displayName of the group
+   * @param limits the counters limits policy object
+   * @return a new counter group
+   */
+  public G newGroup(String name, String displayName, Limits limits) {
+    FrameworkGroupFactory<G> gf = fmap.get(name);
+    if (gf != null) return gf.newGroup(name);
+    if (name.equals(FS_GROUP_NAME)) {
+      return newFileSystemGroup();
+    } else if (s2i.get(name) != null) {
+      return newFrameworkGroup(s2i.get(name));
+    }
+    return newGenericGroup(name, displayName, limits);
+  }
+
+  /**
+   * Create a new framework group
+   * @param id of the group
+   * @return a new framework group
+   */
+  public G newFrameworkGroup(int id) {
+    String name;
+    synchronized(CounterGroupFactory.class) {
+      if (id < 0 || id >= i2s.size()) throwBadFrameGroupIdException(id);
+      name = i2s.get(id); // should not throw here.
+    }
+    FrameworkGroupFactory<G> gf = fmap.get(name);
+    if (gf == null) throwBadFrameGroupIdException(id);
+    return gf.newGroup(name);
+  }
+
+  /**
+   * Get the id of a framework group
+   * @param name of the group
+   * @return the framework group id
+   */
+  public static synchronized int getFrameworkGroupId(String name) {
+    Integer i = s2i.get(name);
+    if (i == null) throwBadFrameworkGroupNameException(name);
+    return i;
+  }
+
+  /**
+   * @return the counter factory version
+   */
+  public int version() {
+    return VERSION;
+  }
+
+  /**
+   * Check whether a group name is a name of a framework group (including
+   * the filesystem group).
+   *
+   * @param name  to check
+   * @return true for framework group names
+   */
+  public static synchronized boolean isFrameworkGroup(String name) {
+    return s2i.get(name) != null || name.equals(FS_GROUP_NAME);
+  }
+
+  private static void throwBadFrameGroupIdException(int id) {
+    throw new IllegalArgumentException("bad framework group id: "+ id);
+  }
+
+  private static void throwBadFrameworkGroupNameException(String name) {
+    throw new IllegalArgumentException("bad framework group name: "+ name);
+  }
+
+  /**
+   * Abstract factory method to create a generic (vs framework) counter group
+   * @param name  of the group
+   * @param displayName of the group
+   * @param limits limits of the counters
+   * @return a new generic counter group
+   */
+  protected abstract G newGenericGroup(String name, String displayName,
+                                       Limits limits);
+
+  /**
+   * Abstract factory method to create a file system counter group
+   * @return a new file system counter group
+   */
+  protected abstract G newFileSystemGroup();
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/CounterGroupFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+// Per-job counters
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum DAGCounter {
+  NUM_FAILED_TASKS, 
+  NUM_KILLED_TASKS,
+  TOTAL_LAUNCHED_TASKS,
+  OTHER_LOCAL_TASKS,
+  DATA_LOCAL_TASKS,
+  RACK_LOCAL_TASKS,
+  SLOTS_MILLIS_TASKS,
+  FALLOW_SLOTS_MILLIS_TASKS,
+  TOTAL_LAUNCHED_UBERTASKS,
+  NUM_UBER_SUBTASKS,
+  NUM_FAILED_UBERTASKS
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/DAGCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum FileSystemCounter {
+  BYTES_READ,
+  BYTES_WRITTEN,
+  READ_OPS,
+  LARGE_READ_OPS,
+  WRITE_OPS,
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * An abstract class to provide common implementation of the filesystem
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <C> the type of the Counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class FileSystemCounterGroup<C extends TezCounter>
+    implements CounterGroupBase<C> {
+
+  static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
+  static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
+
+  // C[] would need Array.newInstance which requires a Class<C> reference.
+  // Just a few local casts probably worth not having to carry it around.
+  private final Map<String, Object[]> map =
+    new ConcurrentSkipListMap<String, Object[]>();
+  private String displayName;
+
+  private static final Joiner NAME_JOINER = Joiner.on('_');
+  private static final Joiner DISP_JOINER = Joiner.on(": ");
+
+  @InterfaceAudience.Private
+  public static class FSCounter extends AbstractCounter {
+    final String scheme;
+    final FileSystemCounter key;
+    private long value;
+
+    public FSCounter(String scheme, FileSystemCounter ref) {
+      this.scheme = scheme;
+      key = ref;
+    }
+
+    @Override
+    public String getName() {
+      return NAME_JOINER.join(scheme, key.name());
+    }
+
+    @Override
+    public String getDisplayName() {
+      return DISP_JOINER.join(scheme, localizeCounterName(key.name()));
+    }
+
+    protected String localizeCounterName(String counterName) {
+      return ResourceBundles.getCounterName(FileSystemCounter.class.getName(),
+                                            counterName, counterName);
+    }
+
+    @Override
+    public long getValue() {
+      return value;
+    }
+
+    @Override
+    public void setValue(long value) {
+      this.value = value;
+    }
+
+    @Override
+    public void increment(long incr) {
+      value += incr;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public TezCounter getUnderlyingCounter() {
+      return this;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return FileSystemCounter.class.getName();
+  }
+
+  @Override
+  public String getDisplayName() {
+    if (displayName == null) {
+      displayName = ResourceBundles.getCounterGroupName(getName(),
+          "File System Counters");
+    }
+    return displayName;
+  }
+
+  @Override
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public void addCounter(C counter) {
+    C ours;
+    if (counter instanceof FileSystemCounterGroup.FSCounter) {
+      FSCounter c = (FSCounter) counter;
+      ours = findCounter(c.scheme, c.key);
+    }
+    else {
+      ours = findCounter(counter.getName());
+    }
+    ours.setValue(counter.getValue());
+  }
+
+  @Override
+  public C addCounter(String name, String displayName, long value) {
+    C counter = findCounter(name);
+    counter.setValue(value);
+    return counter;
+  }
+
+  // Parse generic counter name into [scheme, key]
+  private String[] parseCounterName(String counterName) {
+    int schemeEnd = counterName.indexOf('_');
+    if (schemeEnd < 0) {
+      throw new IllegalArgumentException("bad fs counter name");
+    }
+    return new String[]{counterName.substring(0, schemeEnd),
+                        counterName.substring(schemeEnd + 1)};
+  }
+
+  @Override
+  public C findCounter(String counterName, String displayName) {
+    return findCounter(counterName);
+  }
+
+  @Override
+  public C findCounter(String counterName, boolean create) {
+    try {
+      String[] pair = parseCounterName(counterName);
+      return findCounter(pair[0], FileSystemCounter.valueOf(pair[1]));
+    }
+    catch (Exception e) {
+      if (create) throw new IllegalArgumentException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public C findCounter(String counterName) {
+    return findCounter(counterName, true);
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized C findCounter(String scheme, FileSystemCounter key) {
+    final String canonicalScheme = checkScheme(scheme);
+    Object[] counters = map.get(canonicalScheme);
+    int ord = key.ordinal();
+    if (counters == null) {
+      counters = new Object[FileSystemCounter.values().length];
+      map.put(canonicalScheme, counters);
+      counters[ord] = newCounter(canonicalScheme, key);
+    }
+    else if (counters[ord] == null) {
+      counters[ord] = newCounter(canonicalScheme, key);
+    }
+    return (C) counters[ord];
+  }
+
+  private String checkScheme(String scheme) {
+    String fixed = scheme.toUpperCase(Locale.US);
+    String interned = schemes.putIfAbsent(fixed, fixed);
+    if (schemes.size() > MAX_NUM_SCHEMES) {
+      // mistakes or abuses
+      throw new IllegalArgumentException("too many schemes? "+ schemes.size() +
+                                         " when process scheme: "+ scheme);
+    }
+    return interned == null ? fixed : interned;
+  }
+
+  /**
+   * Abstract factory method to create a file system counter
+   * @param scheme of the file system
+   * @param key the enum of the file system counter
+   * @return a new file system counter
+   */
+  protected abstract C newCounter(String scheme, FileSystemCounter key);
+
+  @Override
+  public int size() {
+    int n = 0;
+    for (Object[] counters : map.values()) {
+      n += numSetCounters(counters);
+    }
+    return n;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void incrAllCounters(CounterGroupBase<C> other) {
+    if (checkNotNull(other.getUnderlyingGroup(), "other group")
+        instanceof FileSystemCounterGroup<?>) {
+      for (TezCounter counter : other) {
+        FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
+        findCounter(c.scheme, c.key) .increment(counter.getValue());
+      }
+    }
+  }
+
+  /**
+   * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, map.size()); // #scheme
+    for (Map.Entry<String, Object[]> entry : map.entrySet()) {
+      WritableUtils.writeString(out, entry.getKey()); // scheme
+      // #counter for the above scheme
+      WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
+      for (Object counter : entry.getValue()) {
+        if (counter == null) continue;
+        @SuppressWarnings("unchecked")
+        FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
+        WritableUtils.writeVInt(out, c.key.ordinal());  // key
+        WritableUtils.writeVLong(out, c.getValue());    // value
+      }
+    }
+  }
+
+  private int numSetCounters(Object[] counters) {
+    int n = 0;
+    for (Object counter : counters) if (counter != null) ++n;
+    return n;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numSchemes = WritableUtils.readVInt(in);    // #scheme
+    FileSystemCounter[] enums = FileSystemCounter.values();
+    for (int i = 0; i < numSchemes; ++i) {
+      String scheme = WritableUtils.readString(in); // scheme
+      int numCounters = WritableUtils.readVInt(in); // #counter
+      for (int j = 0; j < numCounters; ++j) {
+        findCounter(scheme, enums[WritableUtils.readVInt(in)])  // key
+            .setValue(WritableUtils.readVLong(in)); // value
+      }
+    }
+  }
+
+  @Override
+  public Iterator<C> iterator() {
+    return new AbstractIterator<C>() {
+      Iterator<Object[]> it = map.values().iterator();
+      Object[] counters = it.hasNext() ? it.next() : null;
+      int i = 0;
+      @Override
+      protected C computeNext() {
+        while (counters != null) {
+          while (i < counters.length) {
+            @SuppressWarnings("unchecked")
+            C counter = (C) counters[i++];
+            if (counter != null) return counter;
+          }
+          i = 0;
+          counters = it.hasNext() ? it.next() : null;
+        }
+        return endOfData();
+      }
+    };
+  }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    // need to be deep as counters is an array
+    int hash = FileSystemCounter.class.hashCode();
+    for (Object[] counters : map.values()) {
+      if (counters != null) hash ^= Arrays.hashCode(counters);
+    }
+    return hash;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+/**
+ * An abstract class to provide common implementation for the framework
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <T> type of the counter enum class
+ * @param <C> type of the counter
+ */
+@InterfaceAudience.Private
+public abstract class FrameworkCounterGroup<T extends Enum<T>,
+    C extends TezCounter> implements CounterGroupBase<C> {
+  private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
+  
+  private final Class<T> enumClass; // for Enum.valueOf
+  private final Object[] counters;  // local casts are OK and save a class ref
+  private String displayName = null;
+
+  /**
+   * A counter facade for framework counters.
+   * Use old (which extends new) interface to make compatibility easier.
+   */
+  @InterfaceAudience.Private
+  public static class FrameworkCounter<T extends Enum<T>> extends AbstractCounter {
+    final T key;
+    final String groupName;
+    private long value;
+
+    public FrameworkCounter(T ref, String groupName) {
+      key = ref;
+      this.groupName = groupName;
+    }
+
+    @Override
+    public String getName() {
+      return key.name();
+    }
+
+    @Override
+    public String getDisplayName() {
+      return ResourceBundles.getCounterName(groupName, getName(), getName());
+    }
+
+    @Override
+    public long getValue() {
+      return value;
+    }
+
+    @Override
+    public void setValue(long value) {
+      this.value = value;
+    }
+
+    @Override
+    public void increment(long incr) {
+      value += incr;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public TezCounter getUnderlyingCounter() {
+      return this;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public FrameworkCounterGroup(Class<T> enumClass) {
+    this.enumClass = enumClass;
+    T[] enums = enumClass.getEnumConstants();
+    counters = new Object[enums.length];
+  }
+
+  @Override
+  public String getName() {
+    return enumClass.getName();
+  }
+
+  @Override
+  public String getDisplayName() {
+    if (displayName == null) {
+      displayName = ResourceBundles.getCounterGroupName(getName(), getName());
+    }
+    return displayName;
+  }
+
+  @Override
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  private T valueOf(String name) {
+    return Enum.valueOf(enumClass, name);
+  }
+
+  @Override
+  public void addCounter(C counter) {
+    C ours = findCounter(counter.getName());
+    ours.setValue(counter.getValue());
+  }
+
+  @Override
+  public C addCounter(String name, String displayName, long value) {
+    C counter = findCounter(name);
+    counter.setValue(value);
+    return counter;
+  }
+
+  @Override
+  public C findCounter(String counterName, String displayName) {
+    return findCounter(counterName);
+  }
+
+  @Override
+  public C findCounter(String counterName, boolean create) {
+    try {
+      return findCounter(valueOf(counterName));
+    }
+    catch (Exception e) {
+      if (create) throw new IllegalArgumentException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public C findCounter(String counterName) {
+    return findCounter(valueOf(counterName));
+  }
+
+  @SuppressWarnings("unchecked")
+  private C findCounter(T key) {
+    int i = key.ordinal();
+    if (counters[i] == null) {
+      counters[i] = newCounter(key);
+    }
+    return (C) counters[i];
+  }
+
+  /**
+   * Abstract factory method for new framework counter
+   * @param key for the enum value of a counter
+   * @return a new counter for the key
+   */
+  protected abstract C newCounter(T key);
+
+  @Override
+  public int size() {
+    int n = 0;
+    for (int i = 0; i < counters.length; ++i) {
+      if (counters[i] != null) ++n;
+    }
+    return n;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void incrAllCounters(CounterGroupBase<C> other) {
+    if (checkNotNull(other, "other counter group")
+        instanceof FrameworkCounterGroup<?, ?>) {
+      for (TezCounter counter : other) {
+        findCounter(((FrameworkCounter) counter).key.name())
+            .increment(counter.getValue());
+      }
+    }
+  }
+
+  /**
+   * FrameworkGroup ::= #counter (key value)*
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, size());
+    for (int i = 0; i < counters.length; ++i) {
+      TezCounter counter = (C) counters[i];
+      if (counter != null) {
+        WritableUtils.writeVInt(out, i);
+        WritableUtils.writeVLong(out, counter.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    clear();
+    int len = WritableUtils.readVInt(in);
+    T[] enums = enumClass.getEnumConstants();
+    for (int i = 0; i < len; ++i) {
+      int ord = WritableUtils.readVInt(in);
+      TezCounter counter = newCounter(enums[ord]);
+      counter.setValue(WritableUtils.readVLong(in));
+      counters[ord] = counter;
+    }
+  }
+
+  private void clear() {
+    for (int i = 0; i < counters.length; ++i) {
+      counters[i] = null;
+    }
+  }
+
+  @Override
+  public Iterator<C> iterator() {
+    return new AbstractIterator<C>() {
+      int i = 0;
+      @Override
+      protected C computeNext() {
+        while (i < counters.length) {
+          @SuppressWarnings("unchecked")
+          C counter = (C) counters[i++];
+          if (counter != null) return counter;
+        }
+        return endOfData();
+      }
+    };
+  }
+
+  @Override
+  public boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    // need to be deep as counters is an array
+    return Arrays.deepHashCode(new Object[]{enumClass, counters, displayName});
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/GenericCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/GenericCounter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/GenericCounter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/GenericCounter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic counter implementation
+ */
+@InterfaceAudience.Private
+public class GenericCounter extends AbstractCounter {
+
+  private String name;
+  private String displayName;
+  private long value = 0;
+
+  public GenericCounter() {
+    // mostly for readFields
+  }
+
+  public GenericCounter(String name, String displayName) {
+    this.name = name;
+    this.displayName = displayName;
+  }
+
+  public GenericCounter(String name, String displayName, long value) {
+    this.name = name;
+    this.displayName = displayName;
+    this.value = value;
+  }
+
+  @Override @Deprecated
+  public synchronized void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    name = Text.readString(in);
+    displayName = in.readBoolean() ? Text.readString(in) : name;
+    value = WritableUtils.readVLong(in);
+  }
+
+  /**
+   * GenericCounter ::= keyName isDistinctDisplayName [displayName] value
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, name);
+    boolean distinctDisplayName = ! name.equals(displayName);
+    out.writeBoolean(distinctDisplayName);
+    if (distinctDisplayName) {
+      Text.writeString(out, displayName);
+    }
+    WritableUtils.writeVLong(out, value);
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  @Override
+  public synchronized long getValue() {
+    return value;
+  }
+
+  @Override
+  public synchronized void setValue(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public synchronized void increment(long incr) {
+    value += incr;
+  }
+
+  @Override
+  public TezCounter getUnderlyingCounter() {
+    return this;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/GenericCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/JobCounter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/JobCounter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/JobCounter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/JobCounter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+// Per-job counters
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum JobCounter {
+  NUM_FAILED_MAPS, 
+  NUM_FAILED_REDUCES,
+  NUM_KILLED_MAPS,
+  NUM_KILLED_REDUCES,
+  TOTAL_LAUNCHED_MAPS,
+  TOTAL_LAUNCHED_REDUCES,
+  OTHER_LOCAL_MAPS,
+  DATA_LOCAL_MAPS,
+  RACK_LOCAL_MAPS,
+  SLOTS_MILLIS_MAPS,
+  SLOTS_MILLIS_REDUCES,
+  FALLOW_SLOTS_MILLIS_MAPS,
+  FALLOW_SLOTS_MILLIS_REDUCES,
+  TOTAL_LAUNCHED_UBERTASKS,
+  NUM_UBER_SUBMAPS,
+  NUM_UBER_SUBREDUCES,
+  NUM_FAILED_UBERTASKS
+}

Propchange: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/counters/JobCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native