You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2011/12/16 15:21:00 UTC
svn commit: r1215141 [4/4] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./ ivy/
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/docs/src/documentation/content/xdocs/
src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org...
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/QueueName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+
+/**
+ * Represents a queue name.
+ */
+public class QueueName extends DefaultAnonymizableDataType {
+ private final String queueName;
+
+ public QueueName(String queueName) {
+ super();
+ this.queueName = queueName;
+ }
+
+ @Override
+ public String getValue() {
+ return queueName;
+ }
+
+ @Override
+ protected String getPrefix() {
+ return "queue";
+ };
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/UserName.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes;
+
+/**
+ * Represents a user's name.
+ */
+public class UserName extends DefaultAnonymizableDataType {
+ private final String userName;
+
+ public UserName(String userName) {
+ super();
+ this.userName = userName;
+ }
+
+ @Override
+ public String getValue() {
+ return userName;
+ }
+
+ @Override
+ protected String getPrefix() {
+ return "user";
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/DefaultJobPropertiesParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.datatypes.DefaultDataType;
+
+/**
+ * A simple job property parser that acts like a pass-through filter.
+ */
+public class DefaultJobPropertiesParser implements JobPropertyParser {
+ @Override
+ public DataType<?> parseJobProperty(String key, String value) {
+ return new DefaultDataType(value);
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/JobPropertyParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
+
+/**
+ * A {@link JobProperties} parsing utility.
+ */
+public interface JobPropertyParser {
+ /**
+ * Parse the specified job configuration key-value pair.
+ *
+ * @return Returns a {@link DataType} if this parser can parse this value.
+ * Returns 'null' otherwise.
+ */
+ public DataType<?> parseJobProperty(String key, String value);
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/datatypes/util/MapReduceJobPropertiesParser.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.datatypes.util;
+
+import java.lang.reflect.Field;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.datatypes.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * A default parser for MapReduce job configuration properties.
+ * MapReduce job configuration properties are represented as key-value pairs.
+ * Each key represents a configuration knob which controls or affects the
+ * behavior of a MapReduce job or a job's task. The value associated with the
+ * configuration key represents its value. Some of the keys are deprecated. As a
+ * result of deprecation some keys change or are preferred over other keys,
+ * across versions. {@link MapReduceJobPropertiesParser} is a utility class that
+ * parses MapReduce job configuration properties and converts the value into a
+ * well defined {@link DataType}. Users can use the
+ * {@link MapReduceJobPropertiesParser#parseJobProperty()} API to process job
+ * configuration parameters. This API will parse a job property represented as a
+ * key-value pair and return the value wrapped inside a {@link DataType}.
+ * Callers can then use the returned {@link DataType} for further processing.
+ *
+ * {@link MapReduceJobPropertiesParser} thrives on the key name to decide which
+ * {@link DataType} to wrap the value with. Values for keys representing
+ * job-name, queue-name, user-name etc are wrapped inside {@link JobName},
+ * {@link QueueName}, {@link UserName} etc respectively. Keys ending with *dir*
+ * are considered as a directory and hence gets be wrapped inside
+ * {@link FileName}. Similarly key ending with *codec*, *log*, *class* etc are
+ * also handled accordingly. Values representing basic java data-types like
+ * integer, float, double, boolean etc are wrapped inside
+ * {@link DefaultDataType}. If the key represents some jvm-level settings then
+ * only standard settings are extracted and gets wrapped inside
+ * {@link DefaultDataType}. Currently only '-Xmx' and '-Xms' settings are
+ * considered while the rest are ignored.
+ *
+ * Note that the {@link MapReduceJobPropertiesParser#parseJobProperty()} API
+ * maps the keys to a configuration parameter listed in
+ * {@link MRJobConfig}. This not only filters non-framework specific keys thus
+ * ignoring user-specific and hard-to-parse keys but also provides a consistent
+ * view for all possible inputs. So if users invoke the
+ * {@link MapReduceJobPropertiesParser#parseJobProperty()} API with either
+ * <"mapreduce.job.user.name", "bob"> or <"user.name", "bob">, then the result
+ * would be a {@link UserName} {@link DataType} wrapping the user-name "bob".
+ */
+@SuppressWarnings("deprecation")
+public class MapReduceJobPropertiesParser implements JobPropertyParser {
+ private Field[] mrFields = MRJobConfig.class.getFields();
+ private DecimalFormat format = new DecimalFormat();
+ private JobConf configuration = new JobConf(false);
+ private static final Pattern MAX_HEAP_PATTERN =
+ Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
+ private static final Pattern MIN_HEAP_PATTERN =
+ Pattern.compile("-Xms[0-9]+[kKmMgGtT]?+");
+
+ // turn off the warning w.r.t deprecated mapreduce keys
+ static {
+ Logger.getLogger(Configuration.class).setLevel(Level.OFF);
+ }
+
+ // Accepts a key if there is a corresponding key in the current mapreduce
+ // configuration
+ private boolean accept(String key) {
+ return getLatestKeyName(key) != null;
+ }
+
+ // Finds a corresponding key for the specified key in the current mapreduce
+ // setup.
+ // Note that this API uses a cached copy of the Configuration object. This is
+ // purely for performance reasons.
+ private String getLatestKeyName(String key) {
+ // set the specified key
+ configuration.set(key, key);
+ try {
+ // check if keys in MRConfig maps to the specified key.
+ for (Field f : mrFields) {
+ String mrKey = f.get(f.getName()).toString();
+ if (configuration.get(mrKey) != null) {
+ return mrKey;
+ }
+ }
+
+ // unset the key
+ return null;
+ } catch (IllegalAccessException iae) {
+ throw new RuntimeException(iae);
+ } finally {
+ // clean up!
+ configuration.clear();
+ }
+ }
+
+ @Override
+ public DataType<?> parseJobProperty(String key, String value) {
+ if (accept(key)) {
+ return fromString(key, value);
+ }
+
+ return null;
+ }
+
+ /**
+ * Extracts the -Xmx heap option from the specified string.
+ */
+ public static void extractMaxHeapOpts(String javaOptions,
+ List<String> heapOpts,
+ List<String> others) {
+ for (String opt : javaOptions.split(" ")) {
+ Matcher matcher = MAX_HEAP_PATTERN.matcher(opt);
+ if (matcher.find()) {
+ heapOpts.add(opt);
+ } else {
+ others.add(opt);
+ }
+ }
+ }
+
+ /**
+ * Extracts the -Xms heap option from the specified string.
+ */
+ public static void extractMinHeapOpts(String javaOptions,
+ List<String> heapOpts, List<String> others) {
+ for (String opt : javaOptions.split(" ")) {
+ Matcher matcher = MIN_HEAP_PATTERN.matcher(opt);
+ if (matcher.find()) {
+ heapOpts.add(opt);
+ } else {
+ others.add(opt);
+ }
+ }
+ }
+
+ // Maps the value of the specified key.
+ private DataType<?> fromString(String key, String value) {
+ if (value != null) {
+ // check known configs
+ // job-name
+ String latestKey = getLatestKeyName(key);
+
+ if (MRJobConfig.JOB_NAME.equals(latestKey)) {
+ return new JobName(value);
+ }
+ // user-name
+ if (MRJobConfig.USER_NAME.equals(latestKey)) {
+ return new UserName(value);
+ }
+ // queue-name
+ if (MRJobConfig.QUEUE_NAME.equals(latestKey)) {
+ return new QueueName(value);
+ }
+ if (MRJobConfig.MAP_JAVA_OPTS.equals(latestKey)
+ || MRJobConfig.REDUCE_JAVA_OPTS.equals(latestKey)) {
+ List<String> heapOptions = new ArrayList<String>();
+ extractMaxHeapOpts(value, heapOptions, new ArrayList<String>());
+ extractMinHeapOpts(value, heapOptions, new ArrayList<String>());
+ return new DefaultDataType(StringUtils.join(heapOptions, ' '));
+ }
+
+ //TODO compression?
+ //TODO Other job configs like FileOutputFormat/FileInputFormat etc
+
+ // check if the config parameter represents a number
+ try {
+ format.parse(value);
+ return new DefaultDataType(value);
+ } catch (ParseException pe) {}
+
+ // check if the config parameters represents a boolean
+ // avoiding exceptions
+ if ("true".equals(value) || "false".equals(value)) {
+ Boolean.parseBoolean(value);
+ return new DefaultDataType(value);
+ }
+
+ // check if the config parameter represents a class
+ if (latestKey.endsWith(".class") || latestKey.endsWith(".codec")) {
+ return new ClassName(value);
+ }
+
+ // handle distributed cache sizes and timestamps
+ if (latestKey.endsWith("sizes")
+ || latestKey.endsWith(".timestamps")) {
+ new DefaultDataType(value);
+ }
+
+ // check if the config parameter represents a file-system path
+ //TODO: Make this concrete .location .path .dir .jar?
+ if (latestKey.endsWith(".dir") || latestKey.endsWith(".location")
+ || latestKey.endsWith(".jar") || latestKey.endsWith(".path")
+ || latestKey.endsWith(".logfile") || latestKey.endsWith(".file")
+ || latestKey.endsWith(".files") || latestKey.endsWith(".archives")) {
+ try {
+ return new FileName(value);
+ } catch (Exception ioe) {}
+ }
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/BlockingSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * A JSON serializer for Strings.
+ */
+public class BlockingSerializer extends JsonSerializer<String> {
+
+ public void serialize(String object, JsonGenerator jGen, SerializerProvider sProvider)
+ throws IOException, JsonProcessingException {
+ jGen.writeNull();
+ };
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultAnonymizingRumenSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.datatypes.AnonymizableDataType;
+import org.apache.hadoop.tools.rumen.state.StatePool;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Default Rumen JSON serializer.
+ */
+@SuppressWarnings("unchecked")
+public class DefaultAnonymizingRumenSerializer
+ extends JsonSerializer<AnonymizableDataType> {
+ private StatePool statePool;
+ private Configuration conf;
+
+ public DefaultAnonymizingRumenSerializer(StatePool statePool,
+ Configuration conf) {
+ this.statePool = statePool;
+ this.conf = conf;
+ }
+
+ public void serialize(AnonymizableDataType object, JsonGenerator jGen,
+ SerializerProvider sProvider)
+ throws IOException, JsonProcessingException {
+ Object val = object.getAnonymizedValue(statePool, conf);
+ // output the data if its a string
+ if (val instanceof String) {
+ jGen.writeString(val.toString());
+ } else {
+ // let the mapper (JSON generator) handle this anonymized object.
+ jGen.writeObject(val);
+ }
+ };
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/DefaultRumenSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Default Rumen JSON serializer.
+ */
+@SuppressWarnings("unchecked")
+public class DefaultRumenSerializer extends JsonSerializer<DataType> {
+ public void serialize(DataType object, JsonGenerator jGen, SerializerProvider sProvider)
+ throws IOException, JsonProcessingException {
+ Object data = object.getValue();
+ if (data instanceof String) {
+ jGen.writeString(data.toString());
+ } else {
+ jGen.writeObject(data);
+ }
+ };
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/serializers/ObjectStringSerializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.serializers;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+/**
+ * Rumen JSON serializer for serializing object using toSring() API.
+ */
+public class ObjectStringSerializer<T> extends JsonSerializer<T> {
+ public void serialize(T object, JsonGenerator jGen, SerializerProvider sProvider)
+ throws IOException, JsonProcessingException {
+ jGen.writeString(object.toString());
+ };
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/State.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * Represents a state. This state is managed by {@link StatePool}.
+ *
+ * Note that a {@link State} objects should be persistable. Currently, the
+ * {@link State} objects are persisted using the Jackson JSON library. Hence the
+ * implementors of the {@link State} interface should be careful while defining
+ * their public setter and getter APIs.
+ */
+public interface State {
+ /**
+ * Returns true if the state is updated since creation (or reload).
+ */
+ @JsonIgnore
+ boolean isUpdated();
+
+ /**
+ * Get the name of the state.
+ */
+ public String getName();
+
+ /**
+ * Set the name of the state.
+ */
+ public void setName(String name);
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StateDeserializer.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import java.io.IOException;
+
+import org.apache.hadoop.tools.rumen.state.StatePool.StatePair;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.deser.StdDeserializer;
+import org.codehaus.jackson.node.ObjectNode;
+
+/**
+ * Rumen JSON deserializer for deserializing the {@link State} object.
+ */
+public class StateDeserializer extends StdDeserializer<StatePair> {
+ public StateDeserializer() {
+ super(StatePair.class);
+ }
+
+ @Override
+ public StatePair deserialize(JsonParser parser,
+ DeserializationContext context)
+ throws IOException, JsonProcessingException {
+ ObjectMapper mapper = (ObjectMapper) parser.getCodec();
+ // set the state-pair object tree
+ ObjectNode statePairObject = (ObjectNode) mapper.readTree(parser);
+ Class<?> stateClass = null;
+
+ try {
+ stateClass =
+ Class.forName(statePairObject.get("className").getTextValue().trim());
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Invalid classname!", cnfe);
+ }
+
+ String stateJsonString = statePairObject.get("state").toString();
+ State state = (State) mapper.readValue(stateJsonString, stateClass);
+
+ return new StatePair(state);
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java?rev=1215141&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/state/StatePool.java Fri Dec 16 14:20:58 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen.state;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.Anonymizer;
+import org.apache.hadoop.tools.rumen.datatypes.DataType;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+/**
+ * A pool of states. States used by {@link DataType}'s can be managed the
+ * {@link StatePool}. {@link StatePool} also supports persistence. Persistence
+ * is key to share states across multiple {@link Anonymizer} runs.
+ */
+@SuppressWarnings("unchecked")
+public class StatePool {
+ private static final long VERSION = 1L;
+ private boolean isUpdated = false;
+ private boolean isInitialized = false;
+ private Configuration conf;
+
+ // persistence configuration
+ public static final String DIR_CONFIG = "rumen.anonymization.states.dir";
+ public static final String RELOAD_CONFIG =
+ "rumen.anonymization.states.reload";
+ public static final String PERSIST_CONFIG =
+ "rumen.anonymization.states.persist";
+
+ // internal state management configs
+ private static final String COMMIT_STATE_FILENAME = "latest";
+ private static final String CURRENT_STATE_FILENAME = "temp";
+
+ private String timeStamp;
+ private Path persistDirPath;
+ private boolean reload;
+ private boolean persist;
+
+ /**
+ * A wrapper class that binds the state implementation to its implementing
+ * class name.
+ */
+ public static class StatePair {
+ private String className;
+ private State state;
+
+ public StatePair(State state) {
+ this.className = state.getClass().getName();
+ this.state = state;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public void setState(State state) {
+ this.state = state;
+ }
+ }
+
+ /**
+ * Identifies to identify and cache {@link State}s.
+ */
+ private HashMap<String, StatePair> pool = new HashMap<String, StatePair>();
+
+ public void addState(Class id, State state) {
+ if (pool.containsKey(id.getName())) {
+ throw new RuntimeException("State '" + state.getName() + "' added for the"
+ + " class " + id.getName() + " already exists!");
+ }
+ isUpdated = true;
+ pool.put(id.getName(), new StatePair(state));
+ }
+
+ public State getState(Class clazz) {
+ return pool.containsKey(clazz.getName())
+ ? pool.get(clazz.getName()).getState()
+ : null;
+ }
+
+ // For testing
+ @JsonIgnore
+ public boolean isUpdated() {
+ if (!isUpdated) {
+ for (StatePair statePair : pool.values()) {
+ // if one of the states have changed, then the pool is dirty
+ if (statePair.getState().isUpdated()) {
+ isUpdated = true;
+ return true;
+ }
+ }
+ }
+ return isUpdated;
+ }
+
+ /**
+ * Initialized the {@link StatePool}. This API also reloads the previously
+ * persisted state. Note that the {@link StatePool} should be initialized only
+ * once.
+ */
+ public void initialize(Configuration conf) throws Exception {
+ if (isInitialized) {
+ throw new RuntimeException("StatePool is already initialized!");
+ }
+
+ this.conf = conf;
+ String persistDir = conf.get(DIR_CONFIG);
+ reload = conf.getBoolean(RELOAD_CONFIG, false);
+ persist = conf.getBoolean(PERSIST_CONFIG, false);
+
+ // reload if configured
+ if (reload || persist) {
+ System.out.println("State Manager initializing. State directory : "
+ + persistDir);
+ System.out.println("Reload:" + reload + " Persist:" + persist);
+ if (persistDir == null) {
+ throw new RuntimeException("No state persist directory configured!"
+ + " Disable persistence.");
+ } else {
+ this.persistDirPath = new Path(persistDir);
+ }
+ } else {
+ System.out.println("State Manager disabled.");
+ }
+
+ // reload
+ reload();
+
+ // now set the timestamp
+ DateFormat formatter =
+ new SimpleDateFormat("dd-MMM-yyyy-hh'H'-mm'M'-ss'S'");
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ timeStamp = formatter.format(calendar.getTime());
+
+ isInitialized = true;
+ }
+
+ private void reload() throws Exception {
+ if (reload) {
+ // Reload persisted entries
+ Path stateFilename = new Path(persistDirPath, COMMIT_STATE_FILENAME);
+ FileSystem fs = stateFilename.getFileSystem(conf);
+ if (fs.exists(stateFilename)) {
+ reloadState(stateFilename, conf);
+ } else {
+ throw new RuntimeException("No latest state persist directory found!"
+ + " Disable persistence and run.");
+ }
+ }
+ }
+
+ private void reloadState(Path stateFile, Configuration conf)
+ throws Exception {
+ FileSystem fs = stateFile.getFileSystem(conf);
+ if (fs.exists(stateFile)) {
+ System.out.println("Reading state from " + stateFile.toString());
+ FSDataInputStream in = fs.open(stateFile);
+
+ read(in);
+ in.close();
+ } else {
+ System.out.println("No state information found for " + stateFile);
+ }
+ }
+
+ private void read(DataInput in) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(
+ DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+
+ // define a module
+ SimpleModule module = new SimpleModule("State Serializer",
+ new Version(0, 1, 1, "FINAL"));
+ // add the state deserializer
+ module.addDeserializer(StatePair.class, new StateDeserializer());
+
+ // register the module with the object-mapper
+ mapper.registerModule(module);
+
+ JsonParser parser =
+ mapper.getJsonFactory().createJsonParser((DataInputStream)in);
+ StatePool statePool = mapper.readValue(parser, StatePool.class);
+ this.setStates(statePool.getStates());
+ parser.close();
+ }
+
+ /**
+ * Persists the current state to the state directory. The state will be
+ * persisted to the 'latest' file in the state directory.
+ */
+ public void persist() throws IOException {
+ if (!persist) {
+ return;
+ }
+ if (isUpdated()) {
+ System.out.println("State is updated! Committing.");
+ Path currStateFile = new Path(persistDirPath, CURRENT_STATE_FILENAME);
+ Path commitStateFile = new Path(persistDirPath, COMMIT_STATE_FILENAME);
+ FileSystem fs = currStateFile.getFileSystem(conf);
+
+ System.out.println("Starting the persist phase. Persisting to "
+ + currStateFile.toString());
+ // persist current state
+ // write the contents of the current state to the current(temp) directory
+ FSDataOutputStream out = fs.create(currStateFile, true);
+ write(out);
+ out.close();
+
+ System.out.println("Persist phase over. The best known un-committed state"
+ + " is located at " + currStateFile.toString());
+
+ // commit (phase-1)
+ // copy the previous commit file to the relocation file
+ if (fs.exists(commitStateFile)) {
+ Path commitRelocationFile = new Path(persistDirPath, timeStamp);
+ System.out.println("Starting the pre-commit phase. Moving the previous "
+ + "best known state to " + commitRelocationFile.toString());
+ // copy the commit file to the relocation file
+ FileUtil.copy(fs,commitStateFile, fs, commitRelocationFile, false,
+ conf);
+ }
+
+ // commit (phase-2)
+ System.out.println("Starting the commit phase. Committing the states in "
+ + currStateFile.toString());
+ FileUtil.copy(fs, currStateFile, fs, commitStateFile, true, true, conf);
+
+ System.out.println("Commit phase successful! The best known committed "
+ + "state is located at " + commitStateFile.toString());
+ } else {
+ System.out.println("State not updated! No commit required.");
+ }
+ }
+
+ private void write(DataOutput out) throws IOException {
+ // This is just a JSON experiment
+ System.out.println("Dumping the StatePool's in JSON format.");
+ ObjectMapper outMapper = new ObjectMapper();
+ outMapper.configure(
+ SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ // define a module
+ SimpleModule module = new SimpleModule("State Serializer",
+ new Version(0, 1, 1, "FINAL"));
+ // add the state serializer
+ //module.addSerializer(State.class, new StateSerializer());
+
+ // register the module with the object-mapper
+ outMapper.registerModule(module);
+
+ JsonFactory outFactory = outMapper.getJsonFactory();
+ JsonGenerator jGen =
+ outFactory.createJsonGenerator((DataOutputStream)out, JsonEncoding.UTF8);
+ jGen.useDefaultPrettyPrinter();
+
+ jGen.writeObject(this);
+ jGen.close();
+ }
+
+ /**
+ * Getters and setters for JSON serialization
+ */
+
+ /**
+ * To be invoked only by the Jackson JSON serializer.
+ */
+ public long getVersion() {
+ return VERSION;
+ }
+
+ /**
+ * To be invoked only by the Jackson JSON deserializer.
+ */
+ public void setVersion(long version) {
+ if (version != VERSION) {
+ throw new RuntimeException("Version mismatch! Expected " + VERSION
+ + " got " + version);
+ }
+ }
+
+ /**
+ * To be invoked only by the Jackson JSON serializer.
+ */
+ public HashMap<String, StatePair> getStates() {
+ return pool;
+ }
+
+ /**
+ * To be invoked only by the Jackson JSON deserializer.
+ */
+ public void setStates(HashMap<String, StatePair> states) {
+ if (pool.size() > 0) {
+ throw new RuntimeException("Pool not empty!");
+ }
+
+ //TODO Should we do a clone?
+ this.pool = states;
+ }
+}
\ No newline at end of file