You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 02:49:29 UTC
svn commit: r1299136 [2/2] - in /hadoop/common/branches/branch-1: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/...
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for writing typed bytes.
+ */
+public class TypedBytesOutput {
+
+ private DataOutput out;
+
+ private TypedBytesOutput() {}
+
+ private void setDataOutput(DataOutput out) {
+ this.out = out;
+ }
+
+ private static ThreadLocal tbOut = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesOutput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes output for the supplied {@link DataOutput}.
+ *
+ * @param out data output object
+ * @return typed bytes output corresponding to the supplied
+ * {@link DataOutput}.
+ */
+ public static TypedBytesOutput get(DataOutput out) {
+ TypedBytesOutput bout = (TypedBytesOutput) tbOut.get();
+ bout.setDataOutput(out);
+ return bout;
+ }
+
+ /** Creates a new instance of TypedBytesOutput. */
+ public TypedBytesOutput(DataOutput out) {
+ this.out = out;
+ }
+
+ /**
+ * Writes a Java object as a typed bytes sequence.
+ *
+ * @param obj the object to be written
+ * @throws IOException
+ */
+ public void write(Object obj) throws IOException {
+ if (obj instanceof Buffer) {
+ writeBytes(((Buffer) obj).get());
+ } else if (obj instanceof Byte) {
+ writeByte((Byte) obj);
+ } else if (obj instanceof Boolean) {
+ writeBool((Boolean) obj);
+ } else if (obj instanceof Integer) {
+ writeInt((Integer) obj);
+ } else if (obj instanceof Long) {
+ writeLong((Long) obj);
+ } else if (obj instanceof Float) {
+ writeFloat((Float) obj);
+ } else if (obj instanceof Double) {
+ writeDouble((Double) obj);
+ } else if (obj instanceof String) {
+ writeString((String) obj);
+ } else if (obj instanceof ArrayList) {
+ writeVector((ArrayList) obj);
+ } else if (obj instanceof List) {
+ writeList((List) obj);
+ } else if (obj instanceof Map) {
+ writeMap((Map) obj);
+ } else {
+ throw new RuntimeException("cannot write objects of this type");
+ }
+ }
+
+ /**
+ * Writes a raw sequence of typed bytes.
+ *
+ * @param bytes the bytes to be written
+ * @throws IOException
+ */
+ public void writeRaw(byte[] bytes) throws IOException {
+ out.write(bytes);
+ }
+
+ /**
+ * Writes a raw sequence of typed bytes.
+ *
+ * @param bytes the bytes to be written
+ * @param offset an offset in the given array
+ * @param length number of bytes from the given array to write
+ * @throws IOException
+ */
+ public void writeRaw(byte[] bytes, int offset, int length)
+ throws IOException {
+ out.write(bytes, offset, length);
+ }
+
+ /**
+ * Writes a bytes array as a typed bytes sequence.
+ *
+ * @param bytes the bytes array to be written
+ * @throws IOException
+ */
+ public void writeBytes(byte[] bytes) throws IOException {
+ out.write(Type.BYTES.code);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ /**
+ * Writes a byte as a typed bytes sequence.
+ *
+ * @param b the byte to be written
+ * @throws IOException
+ */
+ public void writeByte(byte b) throws IOException {
+ out.write(Type.BYTE.code);
+ out.write(b);
+ }
+
+ /**
+ * Writes a boolean as a typed bytes sequence.
+ *
+ * @param b the boolean to be written
+ * @throws IOException
+ */
+ public void writeBool(boolean b) throws IOException {
+ out.write(Type.BOOL.code);
+ out.writeBoolean(b);
+ }
+
+ /**
+ * Writes an integer as a typed bytes sequence.
+ *
+ * @param i the integer to be written
+ * @throws IOException
+ */
+ public void writeInt(int i) throws IOException {
+ out.write(Type.INT.code);
+ out.writeInt(i);
+ }
+
+ /**
+ * Writes a long as a typed bytes sequence.
+ *
+ * @param l the long to be written
+ * @throws IOException
+ */
+ public void writeLong(long l) throws IOException {
+ out.write(Type.LONG.code);
+ out.writeLong(l);
+ }
+
+ /**
+ * Writes a float as a typed bytes sequence.
+ *
+ * @param f the float to be written
+ * @throws IOException
+ */
+ public void writeFloat(float f) throws IOException {
+ out.write(Type.FLOAT.code);
+ out.writeFloat(f);
+ }
+
+ /**
+ * Writes a double as a typed bytes sequence.
+ *
+ * @param d the double to be written
+ * @throws IOException
+ */
+ public void writeDouble(double d) throws IOException {
+ out.write(Type.DOUBLE.code);
+ out.writeDouble(d);
+ }
+
+ /**
+ * Writes a string as a typed bytes sequence.
+ *
+ * @param s the string to be written
+ * @throws IOException
+ */
+ public void writeString(String s) throws IOException {
+ out.write(Type.STRING.code);
+ WritableUtils.writeString(out, s);
+ }
+
+ /**
+ * Writes a vector as a typed bytes sequence.
+ *
+ * @param vector the vector to be written
+ * @throws IOException
+ */
+ public void writeVector(ArrayList vector) throws IOException {
+ writeVectorHeader(vector.size());
+ for (Object obj : vector) {
+ write(obj);
+ }
+ }
+
+ /**
+ * Writes a vector header.
+ *
+ * @param length the number of elements in the vector
+ * @throws IOException
+ */
+ public void writeVectorHeader(int length) throws IOException {
+ out.write(Type.VECTOR.code);
+ out.writeInt(length);
+ }
+
+ /**
+ * Writes a list as a typed bytes sequence.
+ *
+ * @param list the list to be written
+ * @throws IOException
+ */
+ public void writeList(List list) throws IOException {
+ writeListHeader();
+ for (Object obj : list) {
+ write(obj);
+ }
+ writeListFooter();
+ }
+
+ /**
+ * Writes a list header.
+ *
+ * @throws IOException
+ */
+ public void writeListHeader() throws IOException {
+ out.write(Type.LIST.code);
+ }
+
+ /**
+ * Writes a list footer.
+ *
+ * @throws IOException
+ */
+ public void writeListFooter() throws IOException {
+ out.write(Type.MARKER.code);
+ }
+
+ /**
+ * Writes a map as a typed bytes sequence.
+ *
+ * @param map the map to be written
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public void writeMap(Map map) throws IOException {
+ writeMapHeader(map.size());
+ Set<Entry> entries = map.entrySet();
+ for (Entry entry : entries) {
+ write(entry.getKey());
+ write(entry.getValue());
+ }
+ }
+
+ /**
+ * Writes a map header.
+ *
+ * @param length the number of key-value pairs in the map
+ * @throws IOException
+ */
+ public void writeMapHeader(int length) throws IOException {
+ out.write(Type.MAP.code);
+ out.writeInt(length);
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.record.Buffer;
+import org.apache.hadoop.record.Index;
+import org.apache.hadoop.record.RecordInput;
+
+/**
+ * Serializer for records that writes typed bytes.
+ */
+public class TypedBytesRecordInput implements RecordInput {
+
+ private TypedBytesInput in;
+
+ private TypedBytesRecordInput() {}
+
+ private void setTypedBytesInput(TypedBytesInput in) {
+ this.in = in;
+ }
+
+ private static ThreadLocal tbIn = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesRecordInput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes record input for the supplied
+ * {@link TypedBytesInput}.
+ *
+ * @param in typed bytes input object
+ * @return typed bytes record input corresponding to the supplied
+ * {@link TypedBytesInput}.
+ */
+ public static TypedBytesRecordInput get(TypedBytesInput in) {
+ TypedBytesRecordInput bin = (TypedBytesRecordInput) tbIn.get();
+ bin.setTypedBytesInput(in);
+ return bin;
+ }
+
+ /**
+ * Get a thread-local typed bytes record input for the supplied
+ * {@link DataInput}.
+ *
+ * @param in data input object
+ * @return typed bytes record input corresponding to the supplied
+ * {@link DataInput}.
+ */
+ public static TypedBytesRecordInput get(DataInput in) {
+ return get(TypedBytesInput.get(in));
+ }
+
+ /** Creates a new instance of TypedBytesRecordInput. */
+ public TypedBytesRecordInput(TypedBytesInput in) {
+ this.in = in;
+ }
+
+ /** Creates a new instance of TypedBytesRecordInput. */
+ public TypedBytesRecordInput(DataInput in) {
+ this(new TypedBytesInput(in));
+ }
+
+ public boolean readBool(String tag) throws IOException {
+ in.skipType();
+ return in.readBool();
+ }
+
+ public Buffer readBuffer(String tag) throws IOException {
+ in.skipType();
+ return new Buffer(in.readBytes());
+ }
+
+ public byte readByte(String tag) throws IOException {
+ in.skipType();
+ return in.readByte();
+ }
+
+ public double readDouble(String tag) throws IOException {
+ in.skipType();
+ return in.readDouble();
+ }
+
+ public float readFloat(String tag) throws IOException {
+ in.skipType();
+ return in.readFloat();
+ }
+
+ public int readInt(String tag) throws IOException {
+ in.skipType();
+ return in.readInt();
+ }
+
+ public long readLong(String tag) throws IOException {
+ in.skipType();
+ return in.readLong();
+ }
+
+ public String readString(String tag) throws IOException {
+ in.skipType();
+ return in.readString();
+ }
+
+ public void startRecord(String tag) throws IOException {
+ in.skipType();
+ }
+
+ public Index startVector(String tag) throws IOException {
+ in.skipType();
+ return new TypedBytesIndex(in.readVectorHeader());
+ }
+
+ public Index startMap(String tag) throws IOException {
+ in.skipType();
+ return new TypedBytesIndex(in.readMapHeader());
+ }
+
+ public void endRecord(String tag) throws IOException {}
+
+ public void endVector(String tag) throws IOException {}
+
+ public void endMap(String tag) throws IOException {}
+
+ private static final class TypedBytesIndex implements Index {
+ private int nelems;
+
+ private TypedBytesIndex(int nelems) {
+ this.nelems = nelems;
+ }
+
+ public boolean done() {
+ return (nelems <= 0);
+ }
+
+ public void incr() {
+ nelems--;
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+import org.apache.hadoop.record.Buffer;
+import org.apache.hadoop.record.Record;
+import org.apache.hadoop.record.RecordOutput;
+
+/**
+ * Deserialized for records that reads typed bytes.
+ */
+public class TypedBytesRecordOutput implements RecordOutput {
+
+ private TypedBytesOutput out;
+
+ private TypedBytesRecordOutput() {}
+
+ private void setTypedBytesOutput(TypedBytesOutput out) {
+ this.out = out;
+ }
+
+ private static ThreadLocal tbOut = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesRecordOutput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes record input for the supplied
+ * {@link TypedBytesOutput}.
+ *
+ * @param out typed bytes output object
+ * @return typed bytes record output corresponding to the supplied
+ * {@link TypedBytesOutput}.
+ */
+ public static TypedBytesRecordOutput get(TypedBytesOutput out) {
+ TypedBytesRecordOutput bout = (TypedBytesRecordOutput) tbOut.get();
+ bout.setTypedBytesOutput(out);
+ return bout;
+ }
+
+ /**
+ * Get a thread-local typed bytes record output for the supplied
+ * {@link DataOutput}.
+ *
+ * @param out data output object
+ * @return typed bytes record output corresponding to the supplied
+ * {@link DataOutput}.
+ */
+ public static TypedBytesRecordOutput get(DataOutput out) {
+ return get(TypedBytesOutput.get(out));
+ }
+
+ /** Creates a new instance of TypedBytesRecordOutput. */
+ public TypedBytesRecordOutput(TypedBytesOutput out) {
+ this.out = out;
+ }
+
+ /** Creates a new instance of TypedBytesRecordOutput. */
+ public TypedBytesRecordOutput(DataOutput out) {
+ this(new TypedBytesOutput(out));
+ }
+
+ public void writeBool(boolean b, String tag) throws IOException {
+ out.writeBool(b);
+ }
+
+ public void writeBuffer(Buffer buf, String tag) throws IOException {
+ out.writeBytes(buf.get());
+ }
+
+ public void writeByte(byte b, String tag) throws IOException {
+ out.writeByte(b);
+ }
+
+ public void writeDouble(double d, String tag) throws IOException {
+ out.writeDouble(d);
+ }
+
+ public void writeFloat(float f, String tag) throws IOException {
+ out.writeFloat(f);
+ }
+
+ public void writeInt(int i, String tag) throws IOException {
+ out.writeInt(i);
+ }
+
+ public void writeLong(long l, String tag) throws IOException {
+ out.writeLong(l);
+ }
+
+ public void writeString(String s, String tag) throws IOException {
+ out.writeString(s);
+ }
+
+ public void startRecord(Record r, String tag) throws IOException {
+ out.writeListHeader();
+ }
+
+ public void startVector(ArrayList v, String tag) throws IOException {
+ out.writeVectorHeader(v.size());
+ }
+
+ public void startMap(TreeMap m, String tag) throws IOException {
+ out.writeMapHeader(m.size());
+ }
+
+ public void endRecord(Record r, String tag) throws IOException {
+ out.writeListFooter();
+ }
+
+ public void endVector(ArrayList v, String tag) throws IOException {}
+
+ public void endMap(TreeMap m, String tag) throws IOException {}
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Writable for typed bytes.
+ */
+public class TypedBytesWritable extends BytesWritable {
+
+ /** Create a TypedBytesWritable. */
+ public TypedBytesWritable() {
+ super();
+ }
+
+ /** Create a TypedBytesWritable with a given byte array as initial value. */
+ public TypedBytesWritable(byte[] bytes) {
+ super(bytes);
+ }
+
+ /** Set the typed bytes from a given Java object. */
+ public void setValue(Object obj) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TypedBytesOutput tbo = TypedBytesOutput.get(new DataOutputStream(baos));
+ tbo.write(obj);
+ byte[] bytes = baos.toByteArray();
+ set(bytes, 0, bytes.length);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Get the typed bytes as a Java object. */
+ public Object getValue() {
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(getBytes());
+ TypedBytesInput tbi = TypedBytesInput.get(new DataInputStream(bais));
+ Object obj = tbi.read();
+ return obj;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Get the type code embedded in the first byte. */
+ public Type getType() {
+ byte[] bytes = getBytes();
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ for (Type type : Type.values()) {
+ if (type.code == (int) bytes[0]) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ /** Generate a suitable string representation. */
+ public String toString() {
+ return getValue().toString();
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SortedMapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Provides functionality for reading typed bytes as Writable objects.
+ *
+ * @see TypedBytesInput
+ */
+public class TypedBytesWritableInput {
+
+ private TypedBytesInput in;
+
+ private TypedBytesWritableInput() {}
+
+ private void setTypedBytesInput(TypedBytesInput in) {
+ this.in = in;
+ }
+
+ private static ThreadLocal tbIn = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesWritableInput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes writable input for the supplied
+ * {@link TypedBytesInput}.
+ *
+ * @param in typed bytes input object
+ * @return typed bytes writable input corresponding to the supplied
+ * {@link TypedBytesInput}.
+ */
+ public static TypedBytesWritableInput get(TypedBytesInput in) {
+ TypedBytesWritableInput bin = (TypedBytesWritableInput) tbIn.get();
+ bin.setTypedBytesInput(in);
+ return bin;
+ }
+
+ /**
+ * Get a thread-local typed bytes writable input for the supplied
+ * {@link DataInput}.
+ *
+ * @param in data input object
+ * @return typed bytes writable input corresponding to the supplied
+ * {@link DataInput}.
+ */
+ public static TypedBytesWritableInput get(DataInput in) {
+ return get(TypedBytesInput.get(in));
+ }
+
+ /** Creates a new instance of TypedBytesWritableInput. */
+ public TypedBytesWritableInput(TypedBytesInput in) {
+ this.in = in;
+ }
+
+ /** Creates a new instance of TypedBytesWritableInput. */
+ public TypedBytesWritableInput(DataInput din) {
+ this(new TypedBytesInput(din));
+ }
+
+ public Writable read() throws IOException {
+ Type type = in.readType();
+ if (type == null) {
+ return null;
+ }
+ switch (type) {
+ case BYTES:
+ return readBytes();
+ case BYTE:
+ return readByte();
+ case BOOL:
+ return readBoolean();
+ case INT:
+ return readVInt();
+ case LONG:
+ return readVLong();
+ case FLOAT:
+ return readFloat();
+ case DOUBLE:
+ return readDouble();
+ case STRING:
+ return readText();
+ case VECTOR:
+ return readArray();
+ case MAP:
+ return readMap();
+ default:
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ public Class<? extends Writable> readType() throws IOException {
+ Type type = in.readType();
+ if (type == null) {
+ return null;
+ }
+ switch (type) {
+ case BYTES:
+ return BytesWritable.class;
+ case BYTE:
+ return ByteWritable.class;
+ case BOOL:
+ return BooleanWritable.class;
+ case INT:
+ return VIntWritable.class;
+ case LONG:
+ return VLongWritable.class;
+ case FLOAT:
+ return FloatWritable.class;
+ case DOUBLE:
+ return DoubleWritable.class;
+ case STRING:
+ return Text.class;
+ case VECTOR:
+ return ArrayWritable.class;
+ case MAP:
+ return MapWritable.class;
+ default:
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ public BytesWritable readBytes(BytesWritable bw) throws IOException {
+ byte[] bytes = in.readBytes();
+ if (bw == null) {
+ bw = new BytesWritable(bytes);
+ } else {
+ bw.set(bytes, 0, bytes.length);
+ }
+ return bw;
+ }
+
+ public BytesWritable readBytes() throws IOException {
+ return readBytes(null);
+ }
+
+ public ByteWritable readByte(ByteWritable bw) throws IOException {
+ if (bw == null) {
+ bw = new ByteWritable();
+ }
+ bw.set(in.readByte());
+ return bw;
+ }
+
+ public ByteWritable readByte() throws IOException {
+ return readByte(null);
+ }
+
+ public BooleanWritable readBoolean(BooleanWritable bw) throws IOException {
+ if (bw == null) {
+ bw = new BooleanWritable();
+ }
+ bw.set(in.readBool());
+ return bw;
+ }
+
+ public BooleanWritable readBoolean() throws IOException {
+ return readBoolean(null);
+ }
+
+ public IntWritable readInt(IntWritable iw) throws IOException {
+ if (iw == null) {
+ iw = new IntWritable();
+ }
+ iw.set(in.readInt());
+ return iw;
+ }
+
+ public IntWritable readInt() throws IOException {
+ return readInt(null);
+ }
+
+ public VIntWritable readVInt(VIntWritable iw) throws IOException {
+ if (iw == null) {
+ iw = new VIntWritable();
+ }
+ iw.set(in.readInt());
+ return iw;
+ }
+
+ public VIntWritable readVInt() throws IOException {
+ return readVInt(null);
+ }
+
+ public LongWritable readLong(LongWritable lw) throws IOException {
+ if (lw == null) {
+ lw = new LongWritable();
+ }
+ lw.set(in.readLong());
+ return lw;
+ }
+
+ public LongWritable readLong() throws IOException {
+ return readLong(null);
+ }
+
+ public VLongWritable readVLong(VLongWritable lw) throws IOException {
+ if (lw == null) {
+ lw = new VLongWritable();
+ }
+ lw.set(in.readLong());
+ return lw;
+ }
+
+ public VLongWritable readVLong() throws IOException {
+ return readVLong(null);
+ }
+
+ public FloatWritable readFloat(FloatWritable fw) throws IOException {
+ if (fw == null) {
+ fw = new FloatWritable();
+ }
+ fw.set(in.readFloat());
+ return fw;
+ }
+
+ public FloatWritable readFloat() throws IOException {
+ return readFloat(null);
+ }
+
+ public DoubleWritable readDouble(DoubleWritable dw) throws IOException {
+ if (dw == null) {
+ dw = new DoubleWritable();
+ }
+ dw.set(in.readDouble());
+ return dw;
+ }
+
+ public DoubleWritable readDouble() throws IOException {
+ return readDouble(null);
+ }
+
+ public Text readText(Text t) throws IOException {
+ if (t == null) {
+ t = new Text();
+ }
+ t.set(in.readString());
+ return t;
+ }
+
+ public Text readText() throws IOException {
+ return readText(null);
+ }
+
+ public ArrayWritable readArray(ArrayWritable aw) throws IOException {
+ if (aw == null) {
+ aw = new ArrayWritable(TypedBytesWritable.class);
+ } else if (!aw.getValueClass().equals(TypedBytesWritable.class)) {
+ throw new RuntimeException("value class has to be TypedBytesWritable");
+ }
+ int length = in.readVectorHeader();
+ Writable[] writables = new Writable[length];
+ for (int i = 0; i < length; i++) {
+ writables[i] = new TypedBytesWritable(in.readRaw());
+ }
+ aw.set(writables);
+ return aw;
+ }
+
+ public ArrayWritable readArray() throws IOException {
+ return readArray(null);
+ }
+
+ public MapWritable readMap(MapWritable mw) throws IOException {
+ if (mw == null) {
+ mw = new MapWritable();
+ }
+ int length = in.readMapHeader();
+ for (int i = 0; i < length; i++) {
+ Writable key = read();
+ Writable value = read();
+ mw.put(key, value);
+ }
+ return mw;
+ }
+
+ public MapWritable readMap() throws IOException {
+ return readMap(null);
+ }
+
+ public SortedMapWritable readSortedMap(SortedMapWritable mw)
+ throws IOException {
+ if (mw == null) {
+ mw = new SortedMapWritable();
+ }
+ int length = in.readMapHeader();
+ for (int i = 0; i < length; i++) {
+ WritableComparable key = (WritableComparable) read();
+ Writable value = read();
+ mw.put(key, value);
+ }
+ return mw;
+ }
+
+ public SortedMapWritable readSortedMap() throws IOException {
+ return readSortedMap(null);
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SortedMapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.record.Record;
+
+/**
+ * Provides functionality for writing Writable objects as typed bytes.
+ *
+ * @see TypedBytesOutput
+ */
+public class TypedBytesWritableOutput {
+
+ private TypedBytesOutput out;
+
+ private TypedBytesWritableOutput() {}
+
+ private void setTypedBytesOutput(TypedBytesOutput out) {
+ this.out = out;
+ }
+
+ private static ThreadLocal tbOut = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesWritableOutput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes writable input for the supplied
+ * {@link TypedBytesOutput}.
+ *
+ * @param out typed bytes output object
+ * @return typed bytes writable output corresponding to the supplied
+ * {@link TypedBytesOutput}.
+ */
+ public static TypedBytesWritableOutput get(TypedBytesOutput out) {
+ TypedBytesWritableOutput bout = (TypedBytesWritableOutput) tbOut.get();
+ bout.setTypedBytesOutput(out);
+ return bout;
+ }
+
+ /**
+ * Get a thread-local typed bytes writable output for the supplied
+ * {@link DataOutput}.
+ *
+ * @param out data output object
+ * @return typed bytes writable output corresponding to the supplied
+ * {@link DataOutput}.
+ */
+ public static TypedBytesWritableOutput get(DataOutput out) {
+ return get(TypedBytesOutput.get(out));
+ }
+
+ /** Creates a new instance of TypedBytesWritableOutput. */
+ public TypedBytesWritableOutput(TypedBytesOutput out) {
+ this.out = out;
+ }
+
+ /** Creates a new instance of TypedBytesWritableOutput. */
+ public TypedBytesWritableOutput(DataOutput dout) {
+ this(new TypedBytesOutput(dout));
+ }
+
+ public void write(Writable w) throws IOException {
+ if (w instanceof TypedBytesWritable) {
+ writeTypedBytes((TypedBytesWritable) w);
+ } else if (w instanceof BytesWritable) {
+ writeBytes((BytesWritable) w);
+ } else if (w instanceof ByteWritable) {
+ writeByte((ByteWritable) w);
+ } else if (w instanceof BooleanWritable) {
+ writeBoolean((BooleanWritable) w);
+ } else if (w instanceof IntWritable) {
+ writeInt((IntWritable) w);
+ } else if (w instanceof VIntWritable) {
+ writeVInt((VIntWritable) w);
+ } else if (w instanceof LongWritable) {
+ writeLong((LongWritable) w);
+ } else if (w instanceof VLongWritable) {
+ writeVLong((VLongWritable) w);
+ } else if (w instanceof FloatWritable) {
+ writeFloat((FloatWritable) w);
+ } else if (w instanceof DoubleWritable) {
+ writeDouble((DoubleWritable) w);
+ } else if (w instanceof Text) {
+ writeText((Text) w);
+ } else if (w instanceof ArrayWritable) {
+ writeArray((ArrayWritable) w);
+ } else if (w instanceof MapWritable) {
+ writeMap((MapWritable) w);
+ } else if (w instanceof SortedMapWritable) {
+ writeSortedMap((SortedMapWritable) w);
+ } else if (w instanceof Record) {
+ writeRecord((Record) w);
+ } else {
+ writeWritable(w); // last resort
+ }
+ }
+
+ public void writeTypedBytes(TypedBytesWritable tbw) throws IOException {
+ out.writeRaw(tbw.getBytes(), 0, tbw.getLength());
+ }
+
+ public void writeBytes(BytesWritable bw) throws IOException {
+ byte[] bytes = Arrays.copyOfRange(bw.getBytes(), 0, bw.getLength());
+ out.writeBytes(bytes);
+ }
+
+ public void writeByte(ByteWritable bw) throws IOException {
+ out.writeByte(bw.get());
+ }
+
+ public void writeBoolean(BooleanWritable bw) throws IOException {
+ out.writeBool(bw.get());
+ }
+
+ public void writeInt(IntWritable iw) throws IOException {
+ out.writeInt(iw.get());
+ }
+
+ public void writeVInt(VIntWritable viw) throws IOException {
+ out.writeInt(viw.get());
+ }
+
+ public void writeLong(LongWritable lw) throws IOException {
+ out.writeLong(lw.get());
+ }
+
+ public void writeVLong(VLongWritable vlw) throws IOException {
+ out.writeLong(vlw.get());
+ }
+
+ public void writeFloat(FloatWritable fw) throws IOException {
+ out.writeFloat(fw.get());
+ }
+
+ public void writeDouble(DoubleWritable dw) throws IOException {
+ out.writeDouble(dw.get());
+ }
+
+ public void writeText(Text t) throws IOException {
+ out.writeString(t.toString());
+ }
+
+ public void writeArray(ArrayWritable aw) throws IOException {
+ Writable[] writables = aw.get();
+ out.writeVectorHeader(writables.length);
+ for (Writable writable : writables) {
+ write(writable);
+ }
+ }
+
+ public void writeMap(MapWritable mw) throws IOException {
+ out.writeMapHeader(mw.size());
+ for (Map.Entry<Writable, Writable> entry : mw.entrySet()) {
+ write(entry.getKey());
+ write(entry.getValue());
+ }
+ }
+
+ public void writeSortedMap(SortedMapWritable smw) throws IOException {
+ out.writeMapHeader(smw.size());
+ for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
+ write(entry.getKey());
+ write(entry.getValue());
+ }
+ }
+
+ public void writeRecord(Record r) throws IOException {
+ r.serialize(TypedBytesRecordOutput.get(out));
+ }
+
+ public void writeWritable(Writable w) throws IOException {
+ out.writeVectorHeader(2);
+ out.writeString(w.getClass().getName());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ w.write(dos);
+ dos.close();
+ out.writeBytes(baos.toByteArray());
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html Sat Mar 10 01:49:27 2012
@@ -0,0 +1,66 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+
+Typed bytes are sequences of bytes in which the first byte is a type code. They are especially useful as a
+(simple and very straightforward) binary format for transferring data to and from Hadoop Streaming programs.
+
+<h3>Type Codes</h3>
+
+Each typed bytes sequence starts with an unsigned byte that contains the type code. Possible values are:
+<p>
+<table border="1" cellpadding="2">
+<tr><th>Code</th><th>Type</th></tr>
+<tr><td><i>0</i></td><td>A sequence of bytes.</td></tr>
+<tr><td><i>1</i></td><td>A byte.</td></tr>
+<tr><td><i>2</i></td><td>A boolean.</td></tr>
+<tr><td><i>3</i></td><td>An integer.</td></tr>
+<tr><td><i>4</i></td><td>A long.</td></tr>
+<tr><td><i>5</i></td><td>A float.</td></tr>
+<tr><td><i>6</i></td><td>A double.</td></tr>
+<tr><td><i>7</i></td><td>A string.</td></tr>
+<tr><td><i>8</i></td><td>A vector.</td></tr>
+<tr><td><i>9</i></td><td>A list.</td></tr>
+<tr><td><i>10</i></td><td>A map.</td></tr>
+</table>
+</p>
+
+<h3>Subsequent Bytes</h3>
+
+These are the subsequent bytes for the different type codes (everything is big-endian and unpadded):
+<p>
+<table border="1" cellpadding="2">
+<tr><th>Code</th><th>Subsequent Bytes</th></tr>
+<tr><td><i>0</i></td><td><32-bit signed integer> <as many bytes as indicated by the integer></td></tr>
+<tr><td><i>1</i></td><td><signed byte></td></tr>
+<tr><td><i>2</i></td><td><signed byte (<i>0 = <i>false</i> and <i>1</i> = <i>true</i>)></td></tr>
+<tr><td><i>3</i></td><td><32-bit signed integer></td></tr>
+<tr><td><i>4</i></td><td><64-bit signed integer></td></tr>
+<tr><td><i>5</i></td><td><32-bit IEEE floating point number></td></tr>
+<tr><td><i>6</i></td><td><64-bit IEEE floating point number></td></tr>
+<tr><td><i>7</i></td><td><32-bit signed integer> <as many UTF-8 bytes as indicated by the integer></td></tr>
+<tr><td><i>8</i></td><td><32-bit signed integer> <as many typed bytes sequences as indicated by the integer></td></tr>
+<tr><td><i>9</i></td><td><variable number of typed bytes sequences> <<i>255</i> written as an unsigned byte></td></tr>
+<tr><td><i>10</i></td><td><32-bit signed integer> <as many (key-value) pairs of typed bytes sequences as indicated by the integer></td></tr>
+</table>
+</p>
+
+</body>
+</html>
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class RawBytesMapApp {
+ private String find;
+ private DataOutputStream dos;
+
+ public RawBytesMapApp(String find) {
+ this.find = find;
+ dos = new DataOutputStream(System.out);
+ }
+
+ public void go() throws IOException {
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+ while ((line = in.readLine()) != null) {
+ for (String part : line.split(find)) {
+ writeString(part); // write key
+ writeInt(1); // write value
+ }
+ }
+ System.out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ RawBytesMapApp app = new RawBytesMapApp(args[0].replace(".","\\."));
+ app.go();
+ }
+
+ private void writeString(String str) throws IOException {
+ byte[] bytes = str.getBytes("UTF-8");
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ }
+
+ private void writeInt(int i) throws IOException {
+ dos.writeInt(4);
+ IntWritable iw = new IntWritable(i);
+ iw.write(dos);
+ }
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class RawBytesReduceApp {
+ private DataInputStream dis;
+
+ public RawBytesReduceApp() {
+ dis = new DataInputStream(System.in);
+ }
+
+ public void go() throws IOException {
+ String prevKey = null;
+ int sum = 0;
+ String key = readString();
+ while (key != null) {
+ if (prevKey != null && !key.equals(prevKey)) {
+ System.out.println(prevKey + "\t" + sum);
+ sum = 0;
+ }
+ sum += readInt();
+ prevKey = key;
+ key = readString();
+ }
+ System.out.println(prevKey + "\t" + sum);
+ System.out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ RawBytesReduceApp app = new RawBytesReduceApp();
+ app.go();
+ }
+
+ private String readString() throws IOException {
+ int length;
+ try {
+ length = dis.readInt();
+ } catch (EOFException eof) {
+ return null;
+ }
+ byte[] bytes = new byte[length];
+ dis.readFully(bytes);
+ return new String(bytes, "UTF-8");
+ }
+
+ private int readInt() throws IOException {
+ dis.readInt(); // ignore (we know it's 4)
+ IntWritable iw = new IntWritable();
+ iw.readFields(dis);
+ return iw.get();
+ }
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.streaming.AutoInputFormat;
+
+import junit.framework.TestCase;
+
+public class TestAutoInputFormat extends TestCase {
+
+ private static Configuration conf = new Configuration();
+
+ private static final int LINES_COUNT = 3;
+
+ private static final int RECORDS_COUNT = 3;
+
+ private static final int SPLITS_COUNT = 2;
+
+ @SuppressWarnings( { "unchecked", "deprecation" })
+ public void testFormat() throws IOException {
+ JobConf job = new JobConf(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.build.data", ".") + "/mapred");
+ Path txtFile = new Path(dir, "auto.txt");
+ Path seqFile = new Path(dir, "auto.seq");
+
+ fs.delete(dir, true);
+
+ FileInputFormat.setInputPaths(job, dir);
+
+ Writer txtWriter = new OutputStreamWriter(fs.create(txtFile));
+ try {
+ for (int i = 0; i < LINES_COUNT; i++) {
+ txtWriter.write("" + (10 * i));
+ txtWriter.write("\n");
+ }
+ } finally {
+ txtWriter.close();
+ }
+
+ SequenceFile.Writer seqWriter = SequenceFile.createWriter(fs, conf,
+ seqFile, IntWritable.class, LongWritable.class);
+ try {
+ for (int i = 0; i < RECORDS_COUNT; i++) {
+ IntWritable key = new IntWritable(11 * i);
+ LongWritable value = new LongWritable(12 * i);
+ seqWriter.append(key, value);
+ }
+ } finally {
+ seqWriter.close();
+ }
+
+ AutoInputFormat format = new AutoInputFormat();
+ InputSplit[] splits = format.getSplits(job, SPLITS_COUNT);
+ for (InputSplit split : splits) {
+ RecordReader reader = format.getRecordReader(split, job, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ try {
+ while (reader.next(key, value)) {
+ if (key instanceof LongWritable) {
+ assertEquals("Wrong value class.", Text.class, value.getClass());
+ assertTrue("Invalid value", Integer.parseInt(((Text) value)
+ .toString()) % 10 == 0);
+ } else {
+ assertEquals("Wrong key class.", IntWritable.class, key.getClass());
+ assertEquals("Wrong value class.", LongWritable.class, value
+ .getClass());
+ assertTrue("Invalid key.", ((IntWritable) key).get() % 11 == 0);
+ assertTrue("Invalid value.", ((LongWritable) value).get() % 12 == 0);
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.streaming.DumpTypedBytes;
+import org.apache.hadoop.typedbytes.TypedBytesInput;
+
+import junit.framework.TestCase;
+
+public class TestDumpTypedBytes extends TestCase {
+
+ public void testDumping() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ PrintStream psBackup = System.out;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PrintStream psOut = new PrintStream(out);
+ System.setOut(psOut);
+ DumpTypedBytes dumptb = new DumpTypedBytes(conf);
+
+ try {
+ Path root = new Path("/typedbytestest");
+ assertTrue(fs.mkdirs(root));
+ assertTrue(fs.exists(root));
+ OutputStreamWriter writer = new OutputStreamWriter(fs.create(new Path(
+ root, "test.txt")));
+ try {
+ for (int i = 0; i < 100; i++) {
+ writer.write("" + (10 * i) + "\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ String[] args = new String[1];
+ args[0] = "/typedbytestest";
+ int ret = dumptb.run(args);
+ assertEquals("Return value != 0.", 0, ret);
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(in));
+ int counter = 0;
+ Object key = tbinput.read();
+ while (key != null) {
+ assertEquals(Long.class, key.getClass()); // offset
+ Object value = tbinput.read();
+ assertEquals(String.class, value.getClass());
+ assertTrue("Invalid output.",
+ Integer.parseInt(value.toString()) % 10 == 0);
+ counter++;
+ key = tbinput.read();
+ }
+ assertEquals("Wrong number of outputs.", 100, counter);
+ } finally {
+ try {
+ fs.close();
+ } catch (Exception e) {
+ }
+ System.setOut(psBackup);
+ cluster.shutdown();
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.typedbytes.TypedBytesOutput;
+import org.apache.hadoop.typedbytes.TypedBytesWritable;
+
+import junit.framework.TestCase;
+
+public class TestLoadTypedBytes extends TestCase {
+
+ public void testLoading() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ FileSystem fs = cluster.getFileSystem();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(out));
+ for (int i = 0; i < 100; i++) {
+ tboutput.write(new Long(i)); // key
+ tboutput.write("" + (10 * i)); // value
+ }
+ InputStream isBackup = System.in;
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ System.setIn(in);
+ LoadTypedBytes loadtb = new LoadTypedBytes(conf);
+
+ try {
+ Path root = new Path("/typedbytestest");
+ assertTrue(fs.mkdirs(root));
+ assertTrue(fs.exists(root));
+
+ String[] args = new String[1];
+ args[0] = "/typedbytestest/test.seq";
+ int ret = loadtb.run(args);
+ assertEquals("Return value != 0.", 0, ret);
+
+ Path file = new Path(root, "test.seq");
+ assertTrue(fs.exists(file));
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
+ int counter = 0;
+ TypedBytesWritable key = new TypedBytesWritable();
+ TypedBytesWritable value = new TypedBytesWritable();
+ while (reader.next(key, value)) {
+ assertEquals(Long.class, key.getValue().getClass());
+ assertEquals(String.class, value.getValue().getClass());
+ assertTrue("Invalid record.",
+ Integer.parseInt(value.toString()) % 10 == 0);
+ counter++;
+ }
+ assertEquals("Wrong number of records.", 100, counter);
+ } finally {
+ try {
+ fs.close();
+ } catch (Exception e) {
+ }
+ System.setIn(isBackup);
+ cluster.shutdown();
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestRawBytesStreaming extends TestCase {
+
+ protected File INPUT_FILE = new File("input.txt");
+ protected File OUTPUT_DIR = new File("out");
+ protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+ protected String map = StreamUtil.makeJavaCommand(RawBytesMapApp.class, new String[]{"."});
+ protected String reduce = StreamUtil.makeJavaCommand(RawBytesReduceApp.class, new String[0]);
+ protected String outputExpect = "are\t3\nblue\t1\nbunnies\t1\npink\t1\nred\t1\nroses\t1\nviolets\t1\n";
+
+ public TestRawBytesStreaming() throws IOException {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected void createInput() throws IOException {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ out.write(input.getBytes("UTF-8"));
+ out.close();
+ }
+
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+ "-jobconf", "stream.map.output=rawbytes",
+ "-jobconf", "stream.reduce.input=rawbytes",
+ "-verbose"
+ };
+ }
+
+ public void testCommandLine() throws Exception {
+ try {
+ try {
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ } catch (Exception e) {
+ }
+
+ createInput();
+ OUTPUT_DIR.delete();
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ StreamJob job = new StreamJob();
+ job.setConf(new Configuration());
+ job.run(genArgs());
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ System.out.println(" map=" + map);
+ System.out.println("reduce=" + reduce);
+ System.err.println("outEx1=" + outputExpect);
+ System.err.println(" out1=" + output);
+ assertEquals(outputExpect, output);
+ } finally {
+ File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+ INPUT_FILE.delete();
+ outFileCRC.delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestTypedBytesStreaming extends TestCase {
+
+ protected File INPUT_FILE = new File("input.txt");
+ protected File OUTPUT_DIR = new File("out");
+ protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+ protected String map = StreamUtil.makeJavaCommand(TypedBytesMapApp.class, new String[]{"."});
+ protected String reduce = StreamUtil.makeJavaCommand(TypedBytesReduceApp.class, new String[0]);
+ protected String outputExpect = "are\t3\nred\t1\nblue\t1\npink\t1\nroses\t1\nbunnies\t1\nviolets\t1\n";
+
+ public TestTypedBytesStreaming() throws IOException {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected void createInput() throws IOException {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ out.write(input.getBytes("UTF-8"));
+ out.close();
+ }
+
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+ "-io", "typedbytes"
+ };
+ }
+
+ public void testCommandLine() throws Exception {
+ try {
+ try {
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ } catch (Exception e) {
+ }
+
+ createInput();
+ OUTPUT_DIR.delete();
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ StreamJob job = new StreamJob();
+ job.setConf(new Configuration());
+ job.run(genArgs());
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ System.out.println(" map=" + map);
+ System.out.println("reduce=" + reduce);
+ System.err.println("outEx1=" + outputExpect);
+ System.err.println(" out1=" + output);
+ assertEquals(outputExpect, output);
+ } finally {
+ File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+ INPUT_FILE.delete();
+ outFileCRC.delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.typedbytes.TypedBytesInput;
+import org.apache.hadoop.typedbytes.TypedBytesOutput;
+
+public class TypedBytesMapApp {
+
+ private String find;
+
+ public TypedBytesMapApp(String find) {
+ this.find = find;
+ }
+
+ public void go() throws IOException {
+ TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in));
+ TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(System.out));
+
+ Object key = tbinput.readRaw();
+ while (key != null) {
+ Object value = tbinput.read();
+ for (String part : value.toString().split(find)) {
+ tboutput.write(part); // write key
+ tboutput.write(1); // write value
+ }
+ System.err.println("reporter:counter:UserCounters,InputLines,1");
+ key = tbinput.readRaw();
+ }
+
+ System.out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ TypedBytesMapApp app = new TypedBytesMapApp(args[0].replace(".","\\."));
+ app.go();
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.typedbytes.TypedBytesInput;
+import org.apache.hadoop.typedbytes.TypedBytesOutput;
+
+public class TypedBytesReduceApp {
+
+ public void go() throws IOException {
+ TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in));
+ TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(System.out));
+
+ Object prevKey = null;
+ int sum = 0;
+ Object key = tbinput.read();
+ while (key != null) {
+ if (prevKey != null && !key.equals(prevKey)) {
+ tboutput.write(prevKey); // write key
+ tboutput.write(sum); // write value
+ sum = 0;
+ }
+ sum += (Integer) tbinput.read();
+ prevKey = key;
+ key = tbinput.read();
+ }
+ tboutput.write(prevKey);
+ tboutput.write(sum);
+
+ System.out.flush();
+ }
+
+ public static void main(String[] args) throws IOException {
+ TypedBytesReduceApp app = new TypedBytesReduceApp();
+ app.go();
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.record.Buffer;
+import org.apache.hadoop.record.RecRecord0;
+import org.apache.hadoop.record.RecRecord1;
+
+import junit.framework.TestCase;
+
+public class TestIO extends TestCase {
+
+ private File tmpfile;
+
+ protected void setUp() throws Exception {
+ this.tmpfile = new File(System.getProperty("test.build.data", "/tmp"),
+ "typedbytes.bin");
+ }
+
+ protected void tearDown() throws Exception {
+ tmpfile.delete();
+ }
+
+ public void testIO() throws IOException {
+ ArrayList<Object> vector = new ArrayList<Object>();
+ vector.add("test");
+ vector.add(false);
+ vector.add(12345);
+ List<Object> list = new LinkedList<Object>();
+ list.add("another test");
+ list.add(true);
+ list.add(123456789L);
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("one", 1);
+ map.put("vector", vector);
+ Object[] objects = new Object[] {
+ new Buffer(new byte[] { 1, 2, 3, 4 }),
+ (byte) 123, true, 12345, 123456789L, (float) 1.2, 1.234,
+ "random string", vector, list, map
+ };
+
+ FileOutputStream ostream = new FileOutputStream(tmpfile);
+ DataOutputStream dostream = new DataOutputStream(ostream);
+ TypedBytesOutput out = new TypedBytesOutput(dostream);
+ for (Object obj : objects) {
+ out.write(obj);
+ }
+ dostream.close();
+ ostream.close();
+
+ FileInputStream istream = new FileInputStream(tmpfile);
+ DataInputStream distream = new DataInputStream(istream);
+ TypedBytesInput in = new TypedBytesInput(distream);
+ for (Object obj : objects) {
+ assertEquals(obj, in.read());
+ }
+ distream.close();
+ istream.close();
+
+ istream = new FileInputStream(tmpfile);
+ distream = new DataInputStream(istream);
+ in = new TypedBytesInput(distream);
+ for (Object obj : objects) {
+ byte[] bytes = in.readRaw();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais);
+ assertEquals(obj, (new TypedBytesInput(dis)).read());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TypedBytesOutput tbout = new TypedBytesOutput(new DataOutputStream(baos));
+ tbout.writeRaw(bytes);
+ bais = new ByteArrayInputStream(bytes);
+ dis = new DataInputStream(bais);
+ assertEquals(obj, (new TypedBytesInput(dis)).read());
+ }
+ distream.close();
+ istream.close();
+ }
+
+ public void testRecordIO() throws IOException {
+ RecRecord1 r1 = new RecRecord1();
+ r1.setBoolVal(true);
+ r1.setByteVal((byte) 0x66);
+ r1.setFloatVal(3.145F);
+ r1.setDoubleVal(1.5234);
+ r1.setIntVal(-4567);
+ r1.setLongVal(-2367L);
+ r1.setStringVal("random text");
+ r1.setBufferVal(new Buffer());
+ r1.setVectorVal(new ArrayList<String>());
+ r1.setMapVal(new TreeMap<String, String>());
+ RecRecord0 r0 = new RecRecord0();
+ r0.setStringVal("other random text");
+ r1.setRecordVal(r0);
+
+ FileOutputStream ostream = new FileOutputStream(tmpfile);
+ DataOutputStream dostream = new DataOutputStream(ostream);
+ TypedBytesRecordOutput out = TypedBytesRecordOutput.get(dostream);
+ r1.serialize(out, "");
+ dostream.close();
+ ostream.close();
+
+ FileInputStream istream = new FileInputStream(tmpfile);
+ DataInputStream distream = new DataInputStream(istream);
+ TypedBytesRecordInput in = TypedBytesRecordInput.get(distream);
+ RecRecord1 r2 = new RecRecord1();
+ r2.deserialize(in, "");
+ distream.close();
+ istream.close();
+ assertEquals(r1, r2);
+ }
+
+ public void testWritableIO() throws IOException {
+ Writable[] vectorValues = new Writable[] {
+ new Text("test1"), new Text("test2"), new Text("test3")
+ };
+ ArrayWritable vector = new ArrayWritable(Text.class, vectorValues);
+ MapWritable map = new MapWritable();
+ map.put(new Text("one"), new VIntWritable(1));
+ map.put(new Text("two"), new VLongWritable(2));
+ Writable[] writables = new Writable[] {
+ new BytesWritable(new byte[] { 1, 2, 3, 4 }),
+ new ByteWritable((byte) 123), new BooleanWritable(true),
+ new VIntWritable(12345), new VLongWritable(123456789L),
+ new FloatWritable((float) 1.2), new DoubleWritable(1.234),
+ new Text("random string")
+ };
+ TypedBytesWritable tbw = new TypedBytesWritable();
+ tbw.setValue("typed bytes text");
+ RecRecord1 r1 = new RecRecord1();
+ r1.setBoolVal(true);
+ r1.setByteVal((byte) 0x66);
+ r1.setFloatVal(3.145F);
+ r1.setDoubleVal(1.5234);
+ r1.setIntVal(-4567);
+ r1.setLongVal(-2367L);
+ r1.setStringVal("random text");
+ r1.setBufferVal(new Buffer());
+ r1.setVectorVal(new ArrayList<String>());
+ r1.setMapVal(new TreeMap<String, String>());
+ RecRecord0 r0 = new RecRecord0();
+ r0.setStringVal("other random text");
+ r1.setRecordVal(r0);
+
+ FileOutputStream ostream = new FileOutputStream(tmpfile);
+ DataOutputStream dostream = new DataOutputStream(ostream);
+ TypedBytesWritableOutput out = new TypedBytesWritableOutput(dostream);
+ for (Writable w : writables) {
+ out.write(w);
+ }
+ out.write(tbw);
+ out.write(vector);
+ out.write(map);
+ out.write(r1);
+ dostream.close();
+ ostream.close();
+
+ FileInputStream istream = new FileInputStream(tmpfile);
+ DataInputStream distream = new DataInputStream(istream);
+
+ TypedBytesWritableInput in = new TypedBytesWritableInput(distream);
+ for (Writable w : writables) {
+ assertEquals(w, in.read());
+ }
+
+ assertEquals(tbw.getValue().toString(), in.read().toString());
+
+ assertEquals(ArrayWritable.class, in.readType());
+ ArrayWritable aw = in.readArray();
+ Writable[] writables1 = vector.get(), writables2 = aw.get();
+ assertEquals(writables1.length, writables2.length);
+ for (int i = 0; i < writables1.length; i++) {
+ assertEquals(((Text) writables1[i]).toString(),
+ ((TypedBytesWritable) writables2[i]).getValue());
+ }
+ assertEquals(MapWritable.class, in.readType());
+
+ MapWritable mw = in.readMap();
+ assertEquals(map.entrySet(), mw.entrySet());
+
+ assertEquals(Type.LIST, TypedBytesInput.get(distream).readType());
+ assertEquals(r1.getBoolVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getByteVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getIntVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getLongVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getFloatVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getDoubleVal(), TypedBytesInput.get(distream).read());
+ assertEquals(r1.getStringVal(), TypedBytesInput.get(distream).read());
+ Object prevObj = null, obj = TypedBytesInput.get(distream).read();
+ while (obj != null) {
+ prevObj = obj;
+ obj = TypedBytesInput.get(distream).read();
+ }
+ List recList = (List) prevObj;
+ assertEquals(r0.getStringVal(), recList.get(0));
+
+ distream.close();
+ istream.close();
+ }
+
+}
Added: hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java?rev=1299136&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java Sat Mar 10 01:49:27 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+public class TestTypedBytesWritable extends TestCase {
+
+ public void testToString() {
+ TypedBytesWritable tbw = new TypedBytesWritable();
+ tbw.setValue(true);
+ assertEquals("true", tbw.toString());
+ tbw.setValue(12345);
+ assertEquals("12345", tbw.toString());
+ tbw.setValue(123456789L);
+ assertEquals("123456789", tbw.toString());
+ tbw.setValue((float) 1.23);
+ assertEquals("1.23", tbw.toString());
+ tbw.setValue(1.23456789);
+ assertEquals("1.23456789", tbw.toString());
+ tbw.setValue("random text");
+ assertEquals("random text", tbw.toString());
+ }
+
+ public void testIO() throws IOException {
+ TypedBytesWritable tbw = new TypedBytesWritable();
+ tbw.setValue(12345);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dout = new DataOutputStream(baos);
+ tbw.write(dout);
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInput din = new DataInputStream(bais);
+ TypedBytesWritable readTbw = new TypedBytesWritable();
+ readTbw.readFields(din);
+ assertEquals(tbw, readTbw);
+ }
+
+}