You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2011/12/16 15:21:00 UTC

svn commit: r1215141 [4/4] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ ivy/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/docs/src/documentation/content/xdocs/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org...

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+
+/**
+ * Represents a queue name.
+ */
+public class QueueName extends DefaultAnonymizableDataType {
+  private final String queueName;
+  
+  public QueueName(String queueName) {
+    super();
+    this.queueName = queueName;
+  }
+  
+  @Override
+  public String getValue() {
+    return queueName;
+  }
+  
+  @Override
+  protected String getPrefix() {
+    return "queue";
+  };
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+/**
+ * Represents a user's name.
+ */
+public class UserName extends DefaultAnonymizableDataType {
+  private final String userName;
+  
+  public UserName(String userName) {
+    super();
+    this.userName = userName;
+  }
+  
+  @Override
+  public String getValue() {
+    return userName;
+  }
+  
+  @Override
+  protected String getPrefix() {
+    return "user";
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.datatypes.DefaultDataType;
+
+/**
+ * A simple job property parser that acts like a pass-through filter.
+ */
+public class DefaultJobPropertiesParser implements JobPropertyParser {
+  @Override
+  public DataType<?> parseJobProperty(String key, String value) {
+    return new DefaultDataType(value);
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
+
+/**
+ * A {@link JobProperties} parsing utility.
+ */
+public interface JobPropertyParser {
+  /**
+   * Parse the specified job configuration key-value pair.
+   * 
+   * @return Returns a {@link DataType} if this parser can parse this value.
+   *         Returns 'null' otherwise.
+   */
+  public DataType<?> parseJobProperty(String key, String value);
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import java.lang.reflect.Field;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.datatypes.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * A default parser for MapReduce job configuration properties.
+ * MapReduce job configuration properties are represented as key-value pairs. 
+ * Each key represents a configuration knob which controls or affects the 
+ * behavior of a MapReduce job or a job's task. The value associated with the 
+ * configuration key represents its value. Some of the keys are deprecated. As a
+ * result of deprecation some keys change or are preferred over other keys, 
+ * across versions. {@link MapReduceJobPropertiesParser} is a utility class that
+ * parses MapReduce job configuration properties and converts the value into a 
+ * well defined {@link DataType}. Users can use the
+ * {@link MapReduceJobPropertiesParser#parseJobProperty()} API to process job 
+ * configuration parameters. This API will parse a job property represented as a
+ * key-value pair and return the value wrapped inside a {@link DataType}. 
+ * Callers can then use the returned {@link DataType} for further processing.
+ * 
+ * {@link MapReduceJobPropertiesParser} thrives on the key name to decide which
+ * {@link DataType} to wrap the value with. Values for keys representing 
+ * job-name, queue-name, user-name etc are wrapped inside {@link JobName}, 
+ * {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir* 
+ * are considered as a directory and hence gets be wrapped inside 
+ * {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are
+ * also handled accordingly. Values representing basic java data-types like 
+ * integer, float, double, boolean etc are wrapped inside 
+ * {@link DefaultDataType}. If the key represents some jvm-level settings then 
+ * only standard settings are extracted and gets wrapped inside 
+ * {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are 
+ * considered while the rest are ignored.
+ * 
+ * Note that the {@link MapReduceJobPropertiesParser#parseJobProperty()} API 
+ * maps the keys to a configuration parameter listed in 
+ * {@link MRJobConfig}. This not only filters non-framework specific keys thus 
+ * ignoring user-specific and hard-to-parse keys but also provides a consistent
+ * view for all possible inputs. So if users invoke the 
+ * {@link MapReduceJobPropertiesParser#parseJobProperty()} API with either
+ * <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">, then the result 
+ * would be a {@link UserName} {@link DataType} wrapping the user-name "bob".
+ */
+@SuppressWarnings("deprecation")
+public class MapReduceJobPropertiesParser implements JobPropertyParser {
+  private Field[] mrFields = MRJobConfig.class.getFields();
+  private DecimalFormat format = new DecimalFormat();
+  private JobConf configuration = new JobConf(false);
+  private static final Pattern MAX_HEAP_PATTERN = 
+    Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
+  private static final Pattern MIN_HEAP_PATTERN = 
+    Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+");
+  
+  // turn off the warning w.r.t deprecated mapreduce keys
+  static {
+    Logger.getLogger(Configuration.class).setLevel(Level.OFF);
+  }
+    
+  // Accepts a key if there is a corresponding key in the current mapreduce
+  // configuration
+  private boolean accept(String key) {
+    return getLatestKeyName(key) != null;
+  }
+  
+  // Finds a corresponding key for the specified key in the current mapreduce
+  // setup.
+  // Note that this API uses a cached copy of the Configuration object. This is
+  // purely for performance reasons.
+  private String getLatestKeyName(String key) {
+    // set the specified key
+    configuration.set(key, key);
+    try {
+      // check if keys in MRConfig maps to the specified key.
+      for (Field f : mrFields) {
+        String mrKey = f.get(f.getName()).toString();
+        if (configuration.get(mrKey) != null) {
+          return mrKey;
+        }
+      }
+      
+      // unset the key
+      return null;
+    } catch (IllegalAccessException iae) {
+      throw new RuntimeException(iae);
+    } finally {
+      // clean up!
+      configuration.clear();
+    }
+  }
+  
+  @Override
+  public DataType<?> parseJobProperty(String key, String value) {
+    if (accept(key)) {
+      return fromString(key, value);
+    }
+    
+    return null;
+  }
+  
+  /**
+   * Extracts the -Xmx heap option from the specified string.
+   */
+  public static void extractMaxHeapOpts(String javaOptions, 
+                                        List<String> heapOpts, 
+                                        List<String> others) {
+    for (String opt : javaOptions.split(" ")) {
+      Matcher matcher = MAX_HEAP_PATTERN.matcher(opt);
+      if (matcher.find()) {
+        heapOpts.add(opt);
+      } else {
+        others.add(opt);
+      }
+    }
+  }
+  
+  /**
+   * Extracts the -Xms heap option from the specified string.
+   */
+  public static void extractMinHeapOpts(String javaOptions,  
+      List<String> heapOpts,  List<String> others) {
+    for (String opt : javaOptions.split(" ")) {
+      Matcher matcher = MIN_HEAP_PATTERN.matcher(opt);
+      if (matcher.find()) {
+        heapOpts.add(opt);
+      } else {
+        others.add(opt);
+      }
+    }
+  }
+  
+  // Maps the value of the specified key.
+  private DataType<?> fromString(String key, String value) {
+    if (value != null) {
+      // check known configs
+      //  job-name
+      String latestKey = getLatestKeyName(key);
+      
+      if (MRJobConfig.JOB_NAME.equals(latestKey)) {
+        return new JobName(value);
+      }
+      // user-name
+      if (MRJobConfig.USER_NAME.equals(latestKey)) {
+        return new UserName(value);
+      }
+      // queue-name
+      if (MRJobConfig.QUEUE_NAME.equals(latestKey)) {
+        return new QueueName(value);
+      }
+      if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey) 
+          || MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) {
+        List<String> heapOptions = new ArrayList<String>();
+        extractMaxHeapOpts(value, heapOptions, new ArrayList<String>());
+        extractMinHeapOpts(value, heapOptions, new ArrayList<String>());
+        return new DefaultDataType(StringUtils.join(heapOptions, ' '));
+      }
+      
+      //TODO compression?
+      //TODO Other job configs like FileOutputFormat/FileInputFormat etc
+
+      // check if the config parameter represents a number
+      try {
+        format.parse(value);
+        return new DefaultDataType(value);
+      } catch (ParseException pe) {}
+
+      // check if the config parameters represents a boolean 
+      // avoiding exceptions
+      if ("true".equals(value) || "false".equals(value)) {
+        Boolean.parseBoolean(value);
+        return new DefaultDataType(value);
+      }
+
+      // check if the config parameter represents a class
+      if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) {
+        return new ClassName(value);
+      }
+
+      // handle distributed cache sizes and timestamps
+      if (latestKey.endsWith("sizes") 
+          || latestKey.endsWith(".timestamps")) {
+        new DefaultDataType(value);
+      }
+      
+      // check if the config parameter represents a file-system path
+      //TODO: Make this concrete .location .path .dir .jar?
+      if (latestKey.endsWith(".dir") || latestKey.endsWith(".location") 
+          || latestKey.endsWith(".jar") || latestKey.endsWith(".path") 
+          || latestKey.endsWith(".logfile") || latestKey.endsWith(".file")
+          || latestKey.endsWith(".files") || latestKey.endsWith(".archives")) {
+        try {
+          return new FileName(value);
+        } catch (Exception ioe) {}
+      }
+    }
+
+    return null;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * A JSON serializer for Strings.
+ */
+public class BlockingSerializer extends JsonSerializer<String> {
+  
+  public void serialize(String object, JsonGenerator jGen, SerializerProvider sProvider) 
+  throws IOException, JsonProcessingException {
+    jGen.writeNull();
+  };
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.datatypes.AnonymizableDataType;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Default Rumen JSON serializer.
+ */
+@SuppressWarnings("unchecked")
+public class DefaultAnonymizingRumenSerializer 
+  extends JsonSerializer<AnonymizableDataType> {
+  private StatePool statePool;
+  private Configuration conf;
+  
+  public DefaultAnonymizingRumenSerializer(StatePool statePool, 
+                                           Configuration conf) {
+    this.statePool = statePool;
+    this.conf = conf;
+  }
+  
+  public void serialize(AnonymizableDataType object, JsonGenerator jGen, 
+                        SerializerProvider sProvider) 
+  throws IOException, JsonProcessingException {
+    Object val = object.getAnonymizedValue(statePool, conf);
+    // output the data if its a string
+    if (val instanceof String) {
+      jGen.writeString(val.toString());
+    } else {
+      // let the mapper (JSON generator) handle this anonymized object.
+      jGen.writeObject(val);
+    }
+  };
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Default Rumen JSON serializer.
+ */
+@SuppressWarnings("unchecked")
+public class DefaultRumenSerializer extends JsonSerializer<DataType> {
+  public void serialize(DataType object, JsonGenerator jGen, SerializerProvider sProvider) 
+  throws IOException, JsonProcessingException {
+    Object data = object.getValue();
+    if (data instanceof String) {
+      jGen.writeString(data.toString());
+    } else {
+      jGen.writeObject(data);
+    }
+  };
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Rumen JSON serializer for serializing object using toSring() API.
+ */
+public class ObjectStringSerializer<T> extends JsonSerializer<T> {
+  public void serialize(T object, JsonGenerator jGen, SerializerProvider sProvider) 
+  throws IOException, JsonProcessingException {
+    jGen.writeString(object.toString());
+  };
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * Represents a state. This state is managed by {@link StatePool}.
+ * 
+ * Note that a {@link State} objects should be persistable. Currently, the 
+ * {@link State} objects are persisted using the Jackson JSON library. Hence the
+ * implementors of the {@link State} interface should be careful while defining 
+ * their public setter and getter APIs.  
+ */
+public interface State {
+  /**
+   * Returns true if the state is updated since creation (or reload).
+   */
+  @JsonIgnore
+  boolean isUpdated();
+  
+  /**
+   * Get the name of the state.
+   */
+  public String getName();
+  
+  /**
+   * Set the name of the state.
+   */
+  public void setName(String name);
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import java.io.IOException;
+
+import org.apache.hadoop.tools.rumen.state.StatePool.StatePair;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.deser.StdDeserializer;
+import org.codehaus.jackson.node.ObjectNode;
+
+/**
+ * Rumen JSON deserializer for deserializing the {@link State} object.
+ */
+public class StateDeserializer extends StdDeserializer<StatePair> {
+  public StateDeserializer() {
+      super(StatePair.class);
+  }
+  
+  @Override
+  public StatePair deserialize(JsonParser parser, 
+                               DeserializationContext context)
+  throws IOException, JsonProcessingException {
+    ObjectMapper mapper = (ObjectMapper) parser.getCodec();
+    // set the state-pair object tree
+    ObjectNode statePairObject = (ObjectNode) mapper.readTree(parser);
+    Class<?> stateClass = null;
+    
+    try {
+      stateClass = 
+        Class.forName(statePairObject.get("className").getTextValue().trim());
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException("Invalid classname!", cnfe);
+    }
+    
+    String stateJsonString = statePairObject.get("state").toString();
+    State state = (State) mapper.readValue(stateJsonString, stateClass);
+    
+    return new StatePair(state);
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.Anonymizer;
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+/**
+ * A pool of states. States used by {@link DataType}'s can be managed the 
+ * {@link StatePool}. {@link StatePool} also supports persistence. Persistence
+ * is key to share states across multiple {@link Anonymizer} runs.
+ */
+@SuppressWarnings("unchecked")
+public class StatePool {
+  private static final long VERSION = 1L;
+  private boolean isUpdated = false;
+  private boolean isInitialized = false;
+  private Configuration conf;
+  
+  // persistence configuration
+  public static final String DIR_CONFIG = "rumen.anonymization.states.dir";
+  public static final String RELOAD_CONFIG = 
+    "rumen.anonymization.states.reload";
+  public static final String PERSIST_CONFIG = 
+    "rumen.anonymization.states.persist";
+  
+  // internal state management configs
+  private static final String COMMIT_STATE_FILENAME = "latest";
+  private static final String CURRENT_STATE_FILENAME = "temp";
+  
+  private String timeStamp;
+  private Path persistDirPath;
+  private boolean reload;
+  private boolean persist;
+  
+  /**
+   * A wrapper class that binds the state implementation to its implementing 
+   * class name.
+   */
+  public static class StatePair {
+    private String className;
+    private State state;
+    
+    public StatePair(State state) {
+      this.className = state.getClass().getName();
+      this.state = state;
+    }
+    
+    public String getClassName() {
+      return className;
+    }
+    
+    public void setClassName(String className) {
+      this.className = className;
+    }
+    
+    public State getState() {
+      return state;
+    }
+    
+    public void setState(State state) {
+      this.state = state;
+    }
+  }
+  
+  /**
+   * Identifies to identify and cache {@link State}s.
+   */
+  private HashMap<String, StatePair> pool = new HashMap<String, StatePair>();
+  
+  public void addState(Class id, State state) {
+    if (pool.containsKey(id.getName())) {
+      throw new RuntimeException("State '" + state.getName() + "' added for the" 
+          + " class " + id.getName() + " already exists!");
+    }
+    isUpdated = true;
+    pool.put(id.getName(), new StatePair(state));
+  }
+  
+  public State getState(Class clazz) {
+    return pool.containsKey(clazz.getName()) 
+           ? pool.get(clazz.getName()).getState() 
+           : null;
+  }
+  
+  // For testing
+  @JsonIgnore
+  public boolean isUpdated() {
+    if (!isUpdated) {
+      for (StatePair statePair : pool.values()) {
+        // if one of the states have changed, then the pool is dirty
+        if (statePair.getState().isUpdated()) {
+          isUpdated = true;
+          return true;
+        }
+      }
+    }
+    return isUpdated;
+  }
+  
+  /**
+   * Initialized the {@link StatePool}. This API also reloads the previously
+   * persisted state. Note that the {@link StatePool} should be initialized only
+   * once.
+   */
+  public void initialize(Configuration conf) throws Exception {
+    if (isInitialized) {
+      throw new RuntimeException("StatePool is already initialized!");
+    }
+    
+    this.conf = conf;
+    String persistDir = conf.get(DIR_CONFIG);
+    reload = conf.getBoolean(RELOAD_CONFIG, false);
+    persist = conf.getBoolean(PERSIST_CONFIG, false);
+    
+    // reload if configured
+    if (reload || persist) {
+      System.out.println("State Manager initializing. State directory : " 
+                         + persistDir);
+      System.out.println("Reload:" + reload + " Persist:" + persist);
+      if (persistDir == null) {
+        throw new RuntimeException("No state persist directory configured!" 
+                                   + " Disable persistence.");
+      } else {
+        this.persistDirPath = new Path(persistDir);
+      }
+    } else {
+      System.out.println("State Manager disabled.");
+    }
+    
+    // reload
+    reload();
+    
+    // now set the timestamp
+    DateFormat formatter = 
+      new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'");
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeInMillis(System.currentTimeMillis());
+    timeStamp = formatter.format(calendar.getTime());
+    
+    isInitialized = true;
+  }
+  
+  private void reload() throws Exception {
+    if (reload) {
+      // Reload persisted entries
+      Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME);
+      FileSystem fs = stateFilename.getFileSystem(conf);
+      if (fs.exists(stateFilename)) {
+        reloadState(stateFilename, conf);
+      } else {
+        throw new RuntimeException("No latest state persist directory found!" 
+                                   + " Disable persistence and run.");
+      }
+    }
+  }
+  
+  private void reloadState(Path stateFile, Configuration conf) 
+  throws Exception {
+    FileSystem fs = stateFile.getFileSystem(conf);
+    if (fs.exists(stateFile)) {
+      System.out.println("Reading state from " + stateFile.toString());
+      FSDataInputStream in = fs.open(stateFile);
+      
+      read(in);
+      in.close();
+    } else {
+      System.out.println("No state information found for " + stateFile);
+    }
+  }
+  
+  private void read(DataInput in) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(
+        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    
+    // define a module
+    SimpleModule module = new SimpleModule("State Serializer",  
+        new Version(0, 1, 1, "FINAL"));
+    // add the state deserializer
+    module.addDeserializer(StatePair.class, new StateDeserializer());
+
+    // register the module with the object-mapper
+    mapper.registerModule(module);
+
+    JsonParser parser = 
+      mapper.getJsonFactory().createJsonParser((DataInputStream)in);
+    StatePool statePool = mapper.readValue(parser, StatePool.class);
+    this.setStates(statePool.getStates());
+    parser.close();
+  }
+  
+  /**
+   * Persists the current state to the state directory. The state will be 
+   * persisted to the 'latest' file in the state directory.
+   */
+  public void persist() throws IOException {
+    if (!persist) {
+      return;
+    }
+    if (isUpdated()) {
+      System.out.println("State is updated! Committing.");
+      Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME);
+      Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME);
+      FileSystem fs = currStateFile.getFileSystem(conf);
+
+      System.out.println("Starting the persist phase. Persisting to " 
+                         + currStateFile.toString());
+      // persist current state 
+      //  write the contents of the current state to the current(temp) directory
+      FSDataOutputStream out = fs.create(currStateFile, true);
+      write(out);
+      out.close();
+
+      System.out.println("Persist phase over. The best known un-committed state"
+                         + " is located at " + currStateFile.toString());
+
+      // commit (phase-1) 
+      //  copy the previous commit file to the relocation file
+      if (fs.exists(commitStateFile)) {
+        Path commitRelocationFile = new Path(persistDirPath, timeStamp);
+        System.out.println("Starting the pre-commit phase. Moving the previous " 
+            + "best known state to " + commitRelocationFile.toString());
+        // copy the commit file to the relocation file
+        FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false, 
+                      conf);
+      }
+
+      // commit (phase-2)
+      System.out.println("Starting the commit phase. Committing the states in " 
+                         + currStateFile.toString());
+      FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf);
+
+      System.out.println("Commit phase successful! The best known committed " 
+                         + "state is located at " + commitStateFile.toString());
+    } else {
+      System.out.println("State not updated! No commit required.");
+    }
+  }
+  
+  private void write(DataOutput out) throws IOException {
+    // This is just a JSON experiment
+    System.out.println("Dumping the StatePool's in JSON format.");
+    ObjectMapper outMapper = new ObjectMapper();
+    outMapper.configure(
+        SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    // define a module
+    SimpleModule module = new SimpleModule("State Serializer",  
+        new Version(0, 1, 1, "FINAL"));
+    // add the state serializer
+    //module.addSerializer(State.class, new StateSerializer());
+
+    // register the module with the object-mapper
+    outMapper.registerModule(module);
+
+    JsonFactory outFactory = outMapper.getJsonFactory();
+    JsonGenerator jGen = 
+      outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8);
+    jGen.useDefaultPrettyPrinter();
+
+    jGen.writeObject(this);
+    jGen.close();
+  }
+  
+  /**
+   * Getters and setters for JSON serialization
+   */
+  
+  /**
+   * To be invoked only by the Jackson JSON serializer.
+   */
+  public long getVersion() {
+    return VERSION;
+  }
+  
+  /**
+   * To be invoked only by the Jackson JSON deserializer.
+   */
+  public void setVersion(long version) {
+    if (version != VERSION) {
+      throw new RuntimeException("Version mismatch! Expected " + VERSION 
+                                 + " got " + version);
+    }
+  }
+  
+  /**
+   * To be invoked only by the Jackson JSON serializer.
+   */
+  public HashMap<String, StatePair> getStates() {
+    return pool;
+  }
+  
+  /**
+   * To be invoked only by the Jackson JSON deserializer.
+   */
+  public void setStates(HashMap<String, StatePair> states) {
+    if (pool.size() > 0) {
+      throw new RuntimeException("Pool not empty!");
+    }
+    
+    //TODO Should we do a clone?
+    this.pool = states;
+  }
+}
\ No newline at end of file