You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC

svn commit: r1210600 [3/16] - in /incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/ ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/ ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/ ingest/src/main/java/protobuf/ ...

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package reader;
 
 import java.io.IOException;
@@ -34,109 +34,103 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.util.LineReader;
 
 /**
- * A copy of {@link LineRecordReader} which does not discard lines longer
- * than "mapred.linerecordreader.maxlength". Instead, it returns them, leaving
- * it to the mapper to decide what to do with it. It also does not treat '\r'
- * (CR) characters as new lines -- it uses {@link LfLineReader} instead of
- * {@link LineReader} to read lines.
+ * A copy of {@link LineRecordReader} which does not discard lines longer than "mapred.linerecordreader.maxlength". Instead, it returns them, leaving it to the
+ * mapper to decide what to do with it. It also does not treat '\r' (CR) characters as new lines -- it uses {@link LfLineReader} instead of {@link LineReader}
+ * to read lines.
  */
-public class LongLineRecordReader extends RecordReader<LongWritable, Text> {
-	private CompressionCodecFactory compressionCodecs = null;
-	private long start;
-	private long pos;
-	private long end;
-	private LfLineReader in;
-	private int maxLineLength;
-	private LongWritable key = null;
-	private Text value = null;
-
-	@Override
-	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
-			throws IOException {
-		FileSplit split = (FileSplit) genericSplit;
-		Configuration job = context.getConfiguration();
-		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
-				Integer.MAX_VALUE);
-		start = split.getStart();
-		end = start + split.getLength();
-		final Path file = split.getPath();
-		compressionCodecs = new CompressionCodecFactory(job);
-		final CompressionCodec codec = compressionCodecs.getCodec(file);
-
-		// open the file and seek to the start of the split
-		FileSystem fs = file.getFileSystem(job);
-		FSDataInputStream fileIn = fs.open(split.getPath());
-		boolean skipFirstLine = false;
-		if (codec != null) {
-			in = new LfLineReader(codec.createInputStream(fileIn), job);
-			end = Long.MAX_VALUE;
-		} else {
-			if (start != 0) {
-				skipFirstLine = true;
-				--start;
-				fileIn.seek(start);
-			}
-			in = new LfLineReader(fileIn, job);
-		}
-		if (skipFirstLine) { // skip first line and re-establish "start".
-			start += in.readLine(new Text(), 0, (int) Math.min(
-					Integer.MAX_VALUE, end - start));
-		}
-		this.pos = start;
-	}
-
-	@Override
-	public boolean nextKeyValue() throws IOException {
-		if (key == null) {
-			key = new LongWritable();
-		}
-		key.set(pos);
-		if (value == null) {
-			value = new Text();
-		}
-		int newSize = 0;
-		if (pos < end) {
-			newSize = in.readLine(value, maxLineLength, Math.max((int) Math
-					.min(Integer.MAX_VALUE, end - pos), maxLineLength));
-			if (newSize != 0) {
-				pos += newSize;
-			}
-		}
-		if (newSize == 0) {
-			key = null;
-			value = null;
-			return false;
-		} else {
-			return true;
-		}
-	}
-
-	@Override
-	public LongWritable getCurrentKey() {
-		return key;
-	}
-
-	@Override
-	public Text getCurrentValue() {
-		return value;
-	}
-
-	/**
-	 * Get the progress within the split
-	 */
-	@Override
-	public float getProgress() {
-		if (start == end) {
-			return 0.0f;
-		} else {
-			return Math.min(1.0f, (pos - start) / (float) (end - start));
-		}
-	}
-
-	@Override
-	public synchronized void close() throws IOException {
-		if (in != null) {
-			in.close();
-		}
-	}
+public class LongLineRecordReader extends RecordReader<LongWritable,Text> {
+  private CompressionCodecFactory compressionCodecs = null;
+  private long start;
+  private long pos;
+  private long end;
+  private LfLineReader in;
+  private int maxLineLength;
+  private LongWritable key = null;
+  private Text value = null;
+  
+  @Override
+  public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
+    start = split.getStart();
+    end = start + split.getLength();
+    final Path file = split.getPath();
+    compressionCodecs = new CompressionCodecFactory(job);
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    
+    // open the file and seek to the start of the split
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    boolean skipFirstLine = false;
+    if (codec != null) {
+      in = new LfLineReader(codec.createInputStream(fileIn), job);
+      end = Long.MAX_VALUE;
+    } else {
+      if (start != 0) {
+        skipFirstLine = true;
+        --start;
+        fileIn.seek(start);
+      }
+      in = new LfLineReader(fileIn, job);
+    }
+    if (skipFirstLine) { // skip first line and re-establish "start".
+      start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start));
+    }
+    this.pos = start;
+  }
+  
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (key == null) {
+      key = new LongWritable();
+    }
+    key.set(pos);
+    if (value == null) {
+      value = new Text();
+    }
+    int newSize = 0;
+    if (pos < end) {
+      newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
+      if (newSize != 0) {
+        pos += newSize;
+      }
+    }
+    if (newSize == 0) {
+      key = null;
+      value = null;
+      return false;
+    } else {
+      return true;
+    }
+  }
+  
+  @Override
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+  
+  @Override
+  public Text getCurrentValue() {
+    return value;
+  }
+  
+  /**
+   * Get the progress within the split
+   */
+  @Override
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (pos - start) / (float) (end - start));
+    }
+  }
+  
+  @Override
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package util;
 
 import java.nio.ByteBuffer;
@@ -22,78 +22,87 @@ import org.apache.hadoop.io.Text;
 import org.apache.accumulo.core.iterators.aggregation.LongSummation;
 
 public class TextUtil {
-
-	/**
-	 * Appends a null byte followed by the UTF-8 bytes of the given
-	 * string to the given {@link Text}
-	 * @param text the Text to which to append
-	 * @param string the String to append
-	 */
-	public static void textAppend(Text text, String string) {
-		appendNullByte(text);
-		textAppendNoNull(text, string);
-	}
-
-    public static void textAppend(Text text, String string, boolean replaceBadChar) {
-        appendNullByte(text);
-        textAppendNoNull(text, string, replaceBadChar);
+  
+  /**
+   * Appends a null byte followed by the UTF-8 bytes of the given string to the given {@link Text}
+   * 
+   * @param text
+   *          the Text to which to append
+   * @param string
+   *          the String to append
+   */
+  public static void textAppend(Text text, String string) {
+    appendNullByte(text);
+    textAppendNoNull(text, string);
+  }
+  
+  public static void textAppend(Text text, String string, boolean replaceBadChar) {
+    appendNullByte(text);
+    textAppendNoNull(text, string, replaceBadChar);
+  }
+  
+  public static void textAppend(Text t, long s) {
+    t.append(nullByte, 0, 1);
+    t.append(LongSummation.longToBytes(s), 0, 8);
+  }
+  
+  private static final byte[] nullByte = {0};
+  
+  /**
+   * Appends a null byte to the given text
+   * 
+   * @param text
+   *          the text to which to append the null byte
+   */
+  public static void appendNullByte(Text text) {
+    text.append(nullByte, 0, nullByte.length);
+  }
+  
+  /**
+   * Appends the UTF-8 bytes of the given string to the given {@link Text}
+   * 
+   * @param text
+   *          the Text to which to append
+   * @param string
+   *          the String to append
+   */
+  public static void textAppendNoNull(Text t, String s) {
+    textAppendNoNull(t, s, false);
+  }
+  
+  /**
+   * Appends the UTF-8 bytes of the given string to the given {@link Text}
+   * 
+   * @param t
+   * @param s
+   * @param replaceBadChar
+   */
+  public static void textAppendNoNull(Text t, String s, boolean replaceBadChar) {
+    try {
+      ByteBuffer buffer = Text.encode(s, replaceBadChar);
+      t.append(buffer.array(), 0, buffer.limit());
+    } catch (CharacterCodingException cce) {
+      throw new IllegalArgumentException(cce);
     }
-
-	public static void textAppend(Text t, long s) {
-		t.append(nullByte, 0, 1);
-		t.append(LongSummation.longToBytes(s), 0, 8);
-	}
-
-	private static final byte[] nullByte = {0};
-
-	/**
-	 * Appends a null byte to the given text
-	 * @param text the text to which to append the null byte
-	 */
-	public static void appendNullByte(Text text) {
-		text.append(nullByte, 0, nullByte.length);
-	}
-
-	/**
-	 * Appends the UTF-8 bytes of the given string to the given {@link Text}
-	 * @param text the Text to which to append
-	 * @param string the String to append
-	 */
-	public static void textAppendNoNull(Text t, String s) {
-        textAppendNoNull(t, s, false);
-    }
-
-    /**
-     * Appends the UTF-8 bytes of the given string to the given {@link Text}
-     * @param t
-     * @param s
-     * @param replaceBadChar
-     */
-    public static void textAppendNoNull(Text t, String s, boolean replaceBadChar) {
-        try {
-            ByteBuffer buffer = Text.encode(s, replaceBadChar);
-            t.append(buffer.array(), 0, buffer.limit());
-        } catch (CharacterCodingException cce) {
-            throw new IllegalArgumentException(cce);
-        }
+  }
+  
+  /**
+   * Converts the given string its UTF-8 bytes. This uses Hadoop's method for converting string to UTF-8 and is much faster than calling
+   * {@link String#getBytes(String)}.
+   * 
+   * @param string
+   *          the string to convert
+   * @return the UTF-8 representation of the string
+   */
+  public static byte[] toUtf8(String string) {
+    ByteBuffer buffer;
+    try {
+      buffer = Text.encode(string, false);
+    } catch (CharacterCodingException cce) {
+      throw new IllegalArgumentException(cce);
     }
-
-	/**
-	 * Converts the given string its UTF-8 bytes. This uses Hadoop's method
-	 * for converting string to UTF-8 and is much faster
-	 * than calling {@link String#getBytes(String)}.
-	 * @param string the string to convert
-	 * @return the UTF-8 representation of the string
-	 */
-	public static byte[] toUtf8(String string) {
-		ByteBuffer buffer;
-		try {
-			buffer = Text.encode(string, false);
-		} catch (CharacterCodingException cce) {
-			throw new IllegalArgumentException(cce);
-		}
-		byte[] bytes = new byte[buffer.limit()];
-		System.arraycopy(buffer.array(), 0, bytes, 0, bytes.length);
-		return bytes;
-	}
+    byte[] bytes = new byte[buffer.limit()];
+    System.arraycopy(buffer.array(), 0, bytes, 0, bytes.length);
+    return bytes;
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package aggregator;
 
 import java.util.ArrayList;
@@ -27,146 +27,145 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.iterators.aggregation.Aggregator;
 
 public class GlobalIndexUidAggregatorTest extends TestCase {
-	
-	Aggregator agg = new GlobalIndexUidAggregator();
-	
-
-	private Uid.List.Builder createNewUidList() {
-		return Uid.List.newBuilder();
-	}
-	
-	public void testSingleUid() {
-		agg.reset();
-		Builder b = createNewUidList();
-		b.setCOUNT(1);
-		b.setIGNORE(false);
-		b.addUID(UUID.randomUUID().toString());
-		Uid.List uidList = b.build();
-		Value val = new Value(uidList.toByteArray());
-		agg.collect(val);
-		Value result = agg.aggregate();
-		assertTrue(val.compareTo(result.get()) == 0);
-	}
-	
-	public void testLessThanMax() throws Exception {
-		agg.reset();
-		List<String> savedUUIDs = new ArrayList<String>();
-		for (int i = 0; i < GlobalIndexUidAggregator.MAX - 1; i++) {
-			Builder b = createNewUidList();
-			b.setIGNORE(false);
-			String uuid = UUID.randomUUID().toString();
-			savedUUIDs.add(uuid);
-			b.setCOUNT(i);
-			b.addUID(uuid);
-			Uid.List uidList = b.build();
-			Value val = new Value(uidList.toByteArray());
-			agg.collect(val);
-		}
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == false);
-		assertTrue(resultList.getUIDCount() == (GlobalIndexUidAggregator.MAX - 1));
-		List<String> resultListUUIDs = resultList.getUIDList();
-		for (String s : savedUUIDs) 
-			assertTrue(resultListUUIDs.contains(s));
-	}
-
-	public void testEqualsMax() throws Exception {
-		agg.reset();
-		List<String> savedUUIDs = new ArrayList<String>();
-		for (int i = 0; i < GlobalIndexUidAggregator.MAX; i++) {
-			Builder b = createNewUidList();
-			b.setIGNORE(false);
-			String uuid = UUID.randomUUID().toString();
-			savedUUIDs.add(uuid);
-			b.setCOUNT(i);
-			b.addUID(uuid);
-			Uid.List uidList = b.build();
-			Value val = new Value(uidList.toByteArray());
-			agg.collect(val);
-		}
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == false);
-		assertTrue(resultList.getUIDCount() == (GlobalIndexUidAggregator.MAX));
-		List<String> resultListUUIDs = resultList.getUIDList();
-		for (String s : savedUUIDs) 
-			assertTrue(resultListUUIDs.contains(s));
-	}
-
-	public void testMoreThanMax() throws Exception {
-		agg.reset();
-		List<String> savedUUIDs = new ArrayList<String>();
-		for (int i = 0; i < GlobalIndexUidAggregator.MAX + 10; i++) {
-			Builder b = createNewUidList();
-			b.setIGNORE(false);
-			String uuid = UUID.randomUUID().toString();
-			savedUUIDs.add(uuid);
-			b.setCOUNT(1);
-			b.addUID(uuid);
-			Uid.List uidList = b.build();
-			Value val = new Value(uidList.toByteArray());
-			agg.collect(val);
-		}
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == true);
-		assertTrue(resultList.getUIDCount() == 0);
-		assertTrue(resultList.getCOUNT() == (GlobalIndexUidAggregator.MAX + 10));
-	}
-
-	public void testSeenIgnore() throws Exception {
-		agg.reset();
-		Builder b = createNewUidList();
-		b.setIGNORE(true);
-		b.setCOUNT(0);
-		Uid.List uidList = b.build();
-		Value val = new Value(uidList.toByteArray());
-		agg.collect(val);
-		b = createNewUidList();
-		b.setIGNORE(false);
-		b.setCOUNT(1);
-		b.addUID(UUID.randomUUID().toString());
-		uidList = b.build();
-		val = new Value(uidList.toByteArray());
-		agg.collect(val);
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == true);
-		assertTrue(resultList.getUIDCount() == 0);
-		assertTrue(resultList.getCOUNT() == 1);
-	}
-	
-	public void testInvalidValueType() throws Exception {
-		agg.reset();
-		Value val = new Value(UUID.randomUUID().toString().getBytes());
-		agg.collect(val);
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == false);
-		assertTrue(resultList.getUIDCount() == 0);
-		assertTrue(resultList.getCOUNT() == 0);
-	}
-	
-	public void testCount() throws Exception {
-		agg.reset();
-		UUID uuid = UUID.randomUUID();
-		//Collect the same UUID five times.
-		for (int i = 0; i < 5; i++) {
-			Builder b = createNewUidList();
-			b.setCOUNT(1);
-			b.setIGNORE(false);
-			b.addUID(uuid.toString());
-			Uid.List uidList = b.build();
-			Value val = new Value(uidList.toByteArray());
-			agg.collect(val);
-		}
-		Value result = agg.aggregate();
-		Uid.List resultList = Uid.List.parseFrom(result.get());
-		assertTrue(resultList.getIGNORE() == false);
-		assertTrue(resultList.getUIDCount() == 1);
-		assertTrue(resultList.getCOUNT() == 5);
-
-	}
-
+  
+  Aggregator agg = new GlobalIndexUidAggregator();
+  
+  private Uid.List.Builder createNewUidList() {
+    return Uid.List.newBuilder();
+  }
+  
+  public void testSingleUid() {
+    agg.reset();
+    Builder b = createNewUidList();
+    b.setCOUNT(1);
+    b.setIGNORE(false);
+    b.addUID(UUID.randomUUID().toString());
+    Uid.List uidList = b.build();
+    Value val = new Value(uidList.toByteArray());
+    agg.collect(val);
+    Value result = agg.aggregate();
+    assertTrue(val.compareTo(result.get()) == 0);
+  }
+  
+  public void testLessThanMax() throws Exception {
+    agg.reset();
+    List<String> savedUUIDs = new ArrayList<String>();
+    for (int i = 0; i < GlobalIndexUidAggregator.MAX - 1; i++) {
+      Builder b = createNewUidList();
+      b.setIGNORE(false);
+      String uuid = UUID.randomUUID().toString();
+      savedUUIDs.add(uuid);
+      b.setCOUNT(i);
+      b.addUID(uuid);
+      Uid.List uidList = b.build();
+      Value val = new Value(uidList.toByteArray());
+      agg.collect(val);
+    }
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == false);
+    assertTrue(resultList.getUIDCount() == (GlobalIndexUidAggregator.MAX - 1));
+    List<String> resultListUUIDs = resultList.getUIDList();
+    for (String s : savedUUIDs)
+      assertTrue(resultListUUIDs.contains(s));
+  }
+  
+  public void testEqualsMax() throws Exception {
+    agg.reset();
+    List<String> savedUUIDs = new ArrayList<String>();
+    for (int i = 0; i < GlobalIndexUidAggregator.MAX; i++) {
+      Builder b = createNewUidList();
+      b.setIGNORE(false);
+      String uuid = UUID.randomUUID().toString();
+      savedUUIDs.add(uuid);
+      b.setCOUNT(i);
+      b.addUID(uuid);
+      Uid.List uidList = b.build();
+      Value val = new Value(uidList.toByteArray());
+      agg.collect(val);
+    }
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == false);
+    assertTrue(resultList.getUIDCount() == (GlobalIndexUidAggregator.MAX));
+    List<String> resultListUUIDs = resultList.getUIDList();
+    for (String s : savedUUIDs)
+      assertTrue(resultListUUIDs.contains(s));
+  }
+  
+  public void testMoreThanMax() throws Exception {
+    agg.reset();
+    List<String> savedUUIDs = new ArrayList<String>();
+    for (int i = 0; i < GlobalIndexUidAggregator.MAX + 10; i++) {
+      Builder b = createNewUidList();
+      b.setIGNORE(false);
+      String uuid = UUID.randomUUID().toString();
+      savedUUIDs.add(uuid);
+      b.setCOUNT(1);
+      b.addUID(uuid);
+      Uid.List uidList = b.build();
+      Value val = new Value(uidList.toByteArray());
+      agg.collect(val);
+    }
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == true);
+    assertTrue(resultList.getUIDCount() == 0);
+    assertTrue(resultList.getCOUNT() == (GlobalIndexUidAggregator.MAX + 10));
+  }
+  
+  public void testSeenIgnore() throws Exception {
+    agg.reset();
+    Builder b = createNewUidList();
+    b.setIGNORE(true);
+    b.setCOUNT(0);
+    Uid.List uidList = b.build();
+    Value val = new Value(uidList.toByteArray());
+    agg.collect(val);
+    b = createNewUidList();
+    b.setIGNORE(false);
+    b.setCOUNT(1);
+    b.addUID(UUID.randomUUID().toString());
+    uidList = b.build();
+    val = new Value(uidList.toByteArray());
+    agg.collect(val);
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == true);
+    assertTrue(resultList.getUIDCount() == 0);
+    assertTrue(resultList.getCOUNT() == 1);
+  }
+  
+  public void testInvalidValueType() throws Exception {
+    agg.reset();
+    Value val = new Value(UUID.randomUUID().toString().getBytes());
+    agg.collect(val);
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == false);
+    assertTrue(resultList.getUIDCount() == 0);
+    assertTrue(resultList.getCOUNT() == 0);
+  }
+  
+  public void testCount() throws Exception {
+    agg.reset();
+    UUID uuid = UUID.randomUUID();
+    // Collect the same UUID five times.
+    for (int i = 0; i < 5; i++) {
+      Builder b = createNewUidList();
+      b.setCOUNT(1);
+      b.setIGNORE(false);
+      b.addUID(uuid.toString());
+      Uid.List uidList = b.build();
+      Value val = new Value(uidList.toByteArray());
+      agg.collect(val);
+    }
+    Value result = agg.aggregate();
+    Uid.List resultList = Uid.List.parseFrom(result.get());
+    assertTrue(resultList.getIGNORE() == false);
+    assertTrue(resultList.getUIDCount() == 1);
+    assertTrue(resultList.getCOUNT() == 5);
+    
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package aggregator;
 
 import java.util.List;
@@ -31,112 +31,111 @@ import org.apache.accumulo.core.data.Val
 import com.google.protobuf.InvalidProtocolBufferException;
 
 public class TextIndexAggregatorTest {
-    private TextIndexAggregator aggregator;
-    
-    @Before
-    public void setup() throws Exception {
-        aggregator = new TextIndexAggregator();
-    }
-    
-    @After
-    public void cleanup() {
-        
-    }
-    
-    private TermWeight.Info.Builder createBuilder() {
-        return TermWeight.Info.newBuilder();
-    }
-    
-    @Test
-    public void testSingleValue() throws InvalidProtocolBufferException {
-        aggregator = new TextIndexAggregator();
-        Builder builder = createBuilder();
-        builder.addWordOffset(1);
-        builder.addWordOffset(5);
-        builder.setNormalizedTermFrequency(0.1f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-        
-        Value result = aggregator.aggregate();
-        
-        TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
-        
-        Assert.assertTrue(info.getNormalizedTermFrequency() == 0.1f);
-        
-        List<Integer> offsets = info.getWordOffsetList();
-        Assert.assertTrue(offsets.size() == 2);
-        Assert.assertTrue(offsets.get(0) == 1);
-        Assert.assertTrue(offsets.get(1) == 5);
-    }
-    
-    @Test
-    public void testAggregateTwoValues() throws InvalidProtocolBufferException {
-        aggregator = new TextIndexAggregator();
-        Builder builder = createBuilder();
-        builder.addWordOffset(1);
-        builder.addWordOffset(5);
-        builder.setNormalizedTermFrequency(0.1f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-        
-        builder = createBuilder();
-        builder.addWordOffset(3);
-        builder.setNormalizedTermFrequency(0.05f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-        
-        Value result = aggregator.aggregate();
-        
-        TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
-        
-        Assert.assertTrue(info.getNormalizedTermFrequency() == 0.15f);
-        
-        List<Integer> offsets = info.getWordOffsetList();
-        Assert.assertTrue(offsets.size() == 3);
-        Assert.assertTrue(offsets.get(0) == 1);
-        Assert.assertTrue(offsets.get(1) == 3);
-        Assert.assertTrue(offsets.get(2) == 5);
-    }
-    
-    @Test
-    public void testAggregateManyValues() throws InvalidProtocolBufferException {
-        aggregator = new TextIndexAggregator();
-        
-        Builder builder = createBuilder();
-        builder.addWordOffset(13);
-        builder.addWordOffset(15);
-        builder.addWordOffset(19);
-        builder.setNormalizedTermFrequency(0.12f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-        
-        builder = createBuilder();
-        builder.addWordOffset(1);
-        builder.addWordOffset(5);
-        builder.setNormalizedTermFrequency(0.1f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-        
-        builder = createBuilder();
-        builder.addWordOffset(3);
-        builder.setNormalizedTermFrequency(0.05f);
-        
-        aggregator.collect(new Value(builder.build().toByteArray()));
-
-        
-        Value result = aggregator.aggregate();
-        
-        TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
-        
-        Assert.assertTrue(info.getNormalizedTermFrequency() == 0.27f);
-        
-        List<Integer> offsets = info.getWordOffsetList();
-        Assert.assertTrue(offsets.size() == 6);
-        Assert.assertTrue(offsets.get(0) == 1);
-        Assert.assertTrue(offsets.get(1) == 3);
-        Assert.assertTrue(offsets.get(2) == 5);
-        Assert.assertTrue(offsets.get(3) == 13);
-        Assert.assertTrue(offsets.get(4) == 15);
-        Assert.assertTrue(offsets.get(5) == 19);
-    }
+  private TextIndexAggregator aggregator;
+  
+  @Before
+  public void setup() throws Exception {
+    aggregator = new TextIndexAggregator();
+  }
+  
+  @After
+  public void cleanup() {
+    
+  }
+  
+  private TermWeight.Info.Builder createBuilder() {
+    return TermWeight.Info.newBuilder();
+  }
+  
+  @Test
+  public void testSingleValue() throws InvalidProtocolBufferException {
+    aggregator = new TextIndexAggregator();
+    Builder builder = createBuilder();
+    builder.addWordOffset(1);
+    builder.addWordOffset(5);
+    builder.setNormalizedTermFrequency(0.1f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    Value result = aggregator.aggregate();
+    
+    TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
+    
+    Assert.assertTrue(info.getNormalizedTermFrequency() == 0.1f);
+    
+    List<Integer> offsets = info.getWordOffsetList();
+    Assert.assertTrue(offsets.size() == 2);
+    Assert.assertTrue(offsets.get(0) == 1);
+    Assert.assertTrue(offsets.get(1) == 5);
+  }
+  
+  @Test
+  public void testAggregateTwoValues() throws InvalidProtocolBufferException {
+    aggregator = new TextIndexAggregator();
+    Builder builder = createBuilder();
+    builder.addWordOffset(1);
+    builder.addWordOffset(5);
+    builder.setNormalizedTermFrequency(0.1f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    builder = createBuilder();
+    builder.addWordOffset(3);
+    builder.setNormalizedTermFrequency(0.05f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    Value result = aggregator.aggregate();
+    
+    TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
+    
+    Assert.assertTrue(info.getNormalizedTermFrequency() == 0.15f);
+    
+    List<Integer> offsets = info.getWordOffsetList();
+    Assert.assertTrue(offsets.size() == 3);
+    Assert.assertTrue(offsets.get(0) == 1);
+    Assert.assertTrue(offsets.get(1) == 3);
+    Assert.assertTrue(offsets.get(2) == 5);
+  }
+  
+  @Test
+  public void testAggregateManyValues() throws InvalidProtocolBufferException {
+    aggregator = new TextIndexAggregator();
+    
+    Builder builder = createBuilder();
+    builder.addWordOffset(13);
+    builder.addWordOffset(15);
+    builder.addWordOffset(19);
+    builder.setNormalizedTermFrequency(0.12f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    builder = createBuilder();
+    builder.addWordOffset(1);
+    builder.addWordOffset(5);
+    builder.setNormalizedTermFrequency(0.1f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    builder = createBuilder();
+    builder.addWordOffset(3);
+    builder.setNormalizedTermFrequency(0.05f);
+    
+    aggregator.collect(new Value(builder.build().toByteArray()));
+    
+    Value result = aggregator.aggregate();
+    
+    TermWeight.Info info = TermWeight.Info.parseFrom(result.get());
+    
+    Assert.assertTrue(info.getNormalizedTermFrequency() == 0.27f);
+    
+    List<Integer> offsets = info.getWordOffsetList();
+    Assert.assertTrue(offsets.size() == 6);
+    Assert.assertTrue(offsets.get(0) == 1);
+    Assert.assertTrue(offsets.get(1) == 3);
+    Assert.assertTrue(offsets.get(2) == 5);
+    Assert.assertTrue(offsets.get(3) == 13);
+    Assert.assertTrue(offsets.get(4) == 15);
+    Assert.assertTrue(offsets.get(5) == 19);
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import org.apache.hadoop.mapreduce.Counter;
@@ -21,50 +21,50 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.StatusReporter;
 
 public class StandaloneStatusReporter extends StatusReporter {
-	
-	private Counters c = new Counters();
-	
-	private long filesProcessed = 0;
-	private long recordsProcessed = 0;
-
-	public Counters getCounters() {
-		return c;
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> name) {
-		return c.findCounter(name);
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		return c.findCounter(group, name);
-	}
-
-	@Override
-	public void progress() {
-		// do nothing
-	}
-
-	@Override
-	public void setStatus(String status) {
-		// do nothing
-	}
-
-	public long getFilesProcessed() {
-		return filesProcessed;
-	}
-	
-	public long getRecordsProcessed() {
-		return recordsProcessed;
-	}
-	
-	public void incrementFilesProcessed() {
-		filesProcessed++;
-		recordsProcessed = 0;
-	}
-	
-	public void incrementRecordsProcessed() {
-		recordsProcessed++;
-	}
+  
+  private Counters c = new Counters();
+  
+  private long filesProcessed = 0;
+  private long recordsProcessed = 0;
+  
+  public Counters getCounters() {
+    return c;
+  }
+  
+  @Override
+  public Counter getCounter(Enum<?> name) {
+    return c.findCounter(name);
+  }
+  
+  @Override
+  public Counter getCounter(String group, String name) {
+    return c.findCounter(group, name);
+  }
+  
+  @Override
+  public void progress() {
+    // do nothing
+  }
+  
+  @Override
+  public void setStatus(String status) {
+    // do nothing
+  }
+  
+  public long getFilesProcessed() {
+    return filesProcessed;
+  }
+  
+  public long getRecordsProcessed() {
+    return recordsProcessed;
+  }
+  
+  public void incrementFilesProcessed() {
+    filesProcessed++;
+    recordsProcessed = 0;
+  }
+  
+  public void incrementRecordsProcessed() {
+    recordsProcessed++;
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package ingest;
 
 import java.io.File;
@@ -56,112 +56,111 @@ import org.apache.accumulo.core.security
  * Load some data into mock accumulo
  */
 public class WikipediaMapperTest {
-	
-	private static final String METADATA_TABLE_NAME = "wikiMetadata";
-	
-	private static final String TABLE_NAME = "wiki";
-	
-	private static final String INDEX_TABLE_NAME = "wikiIndex";
-	
-	private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
-	
-	private class MockAccumuloRecordWriter extends RecordWriter<Text, Mutation> {
-		@Override
-		public void write(Text key, Mutation value) throws IOException, InterruptedException {
-			try {
-				writerMap.get(key).addMutation(value);
-			} catch (MutationsRejectedException e) {
-				throw new IOException("Error adding mutation", e);
-			}
-		}
-
-		@Override
-		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-				try {
-					for (BatchWriter w : writerMap.values()) {
-						w.flush();
-						w.close();
-					}
-				} catch (MutationsRejectedException e) {
-					throw new IOException("Error closing Batch Writer", e);
-				}
-		}
-		
-	}
-
-	private Connector c = null;
-	private Configuration conf = new Configuration();
-	private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
-
-	@Before
-	public void setup() throws Exception {
-		
-		conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
-		conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
-		conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
-		conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
-
-		
-		MockInstance i = new MockInstance();
-        c = i.getConnector("root", "pass");
-		c.tableOperations().delete(METADATA_TABLE_NAME);
-		c.tableOperations().delete(TABLE_NAME);
-		c.tableOperations().delete(INDEX_TABLE_NAME);
-		c.tableOperations().delete(RINDEX_TABLE_NAME);
-        c.tableOperations().create(METADATA_TABLE_NAME);
-        c.tableOperations().create(TABLE_NAME);
-		c.tableOperations().create(INDEX_TABLE_NAME);
-		c.tableOperations().create(RINDEX_TABLE_NAME);
-
-        writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1));
-        writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1));
-        writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1));
-        writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1));
-
-        TaskAttemptID id = new TaskAttemptID();
-		TaskAttemptContext context = new TaskAttemptContext(conf, id);
-
-		RawLocalFileSystem fs = new RawLocalFileSystem();
-		fs.setConf(conf);
-
-		URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
-		Assert.assertNotNull(url);
-		File data = new File(url.toURI());
-		Path tmpFile = new Path(data.getAbsolutePath());
-        
-		//Setup the Mapper
-		InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
-		AggregatingRecordReader rr = new AggregatingRecordReader();
-		Path ocPath = new Path(tmpFile, "oc");
-		OutputCommitter oc = new FileOutputCommitter(ocPath, context);
-		fs.deleteOnExit(ocPath);
-		StandaloneStatusReporter sr = new StandaloneStatusReporter();
-		rr.initialize(split, context);
-		MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
-		WikipediaMapper mapper = new WikipediaMapper();
-		
-		//Load data into Mock Accumulo
-		Mapper<LongWritable, Text, Text, Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split);
-		mapper.run(con);
-		
-		//Flush and close record writers.
-		rw.close(context);
-		
-	}
-	
-	private void debugQuery(String tableName) throws Exception {
-		Scanner s = c.createScanner(tableName, new Authorizations("all"));
-		Range r = new Range();
-		s.setRange(r);
-		for (Entry<Key,Value> entry : s)
-			System.out.println(entry.getKey().toString() +" " + entry.getValue().toString());
-	}
-
-	@Test
-	public void testViewAllData() throws Exception {
-		debugQuery(METADATA_TABLE_NAME);
-		debugQuery(TABLE_NAME);
-		debugQuery(INDEX_TABLE_NAME);
-		debugQuery(RINDEX_TABLE_NAME);
-	}
+  
+  private static final String METADATA_TABLE_NAME = "wikiMetadata";
+  
+  private static final String TABLE_NAME = "wiki";
+  
+  private static final String INDEX_TABLE_NAME = "wikiIndex";
+  
+  private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
+  
+  private class MockAccumuloRecordWriter extends RecordWriter<Text,Mutation> {
+    @Override
+    public void write(Text key, Mutation value) throws IOException, InterruptedException {
+      try {
+        writerMap.get(key).addMutation(value);
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error adding mutation", e);
+      }
+    }
+    
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      try {
+        for (BatchWriter w : writerMap.values()) {
+          w.flush();
+          w.close();
+        }
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error closing Batch Writer", e);
+      }
+    }
+    
+  }
+  
+  private Connector c = null;
+  private Configuration conf = new Configuration();
+  private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
+  
+  @Before
+  public void setup() throws Exception {
+    
+    conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+    conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
+    conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    
+    MockInstance i = new MockInstance();
+    c = i.getConnector("root", "pass");
+    c.tableOperations().delete(METADATA_TABLE_NAME);
+    c.tableOperations().delete(TABLE_NAME);
+    c.tableOperations().delete(INDEX_TABLE_NAME);
+    c.tableOperations().delete(RINDEX_TABLE_NAME);
+    c.tableOperations().create(METADATA_TABLE_NAME);
+    c.tableOperations().create(TABLE_NAME);
+    c.tableOperations().create(INDEX_TABLE_NAME);
+    c.tableOperations().create(RINDEX_TABLE_NAME);
+    
+    writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1));
+    writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1));
+    writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1));
+    writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1));
+    
+    TaskAttemptID id = new TaskAttemptID();
+    TaskAttemptContext context = new TaskAttemptContext(conf, id);
+    
+    RawLocalFileSystem fs = new RawLocalFileSystem();
+    fs.setConf(conf);
+    
+    URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
+    Assert.assertNotNull(url);
+    File data = new File(url.toURI());
+    Path tmpFile = new Path(data.getAbsolutePath());
+    
+    // Setup the Mapper
+    InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
+    AggregatingRecordReader rr = new AggregatingRecordReader();
+    Path ocPath = new Path(tmpFile, "oc");
+    OutputCommitter oc = new FileOutputCommitter(ocPath, context);
+    fs.deleteOnExit(ocPath);
+    StandaloneStatusReporter sr = new StandaloneStatusReporter();
+    rr.initialize(split, context);
+    MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
+    WikipediaMapper mapper = new WikipediaMapper();
+    
+    // Load data into Mock Accumulo
+    Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split);
+    mapper.run(con);
+    
+    // Flush and close record writers.
+    rw.close(context);
+    
+  }
+  
+  private void debugQuery(String tableName) throws Exception {
+    Scanner s = c.createScanner(tableName, new Authorizations("all"));
+    Range r = new Range();
+    s.setRange(r);
+    for (Entry<Key,Value> entry : s)
+      System.out.println(entry.getKey().toString() + " " + entry.getValue().toString());
+  }
+  
+  @Test
+  public void testViewAllData() throws Exception {
+    debugQuery(METADATA_TABLE_NAME);
+    debugQuery(TABLE_NAME);
+    debugQuery(INDEX_TABLE_NAME);
+    debugQuery(RINDEX_TABLE_NAME);
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package normalizer;
 
 import static org.junit.Assert.assertTrue;
@@ -21,75 +21,69 @@ import static org.junit.Assert.assertTru
 import org.junit.Test;
 
 public class testNumberNormalizer {
-
-	
-	@Test
-	public void test1() throws Exception {
-		NumberNormalizer nn = new NumberNormalizer();
-		
-		String n1 = nn.normalizeFieldValue(null, "1");
-		String n2 = nn.normalizeFieldValue(null, "1.00000000");
-		
-		assertTrue( n1.compareTo(n2) < 0);
-		
-	}
-	
-	@Test
-	public void test2() {
-		NumberNormalizer nn = new NumberNormalizer();
-		
-		String n1 = nn.normalizeFieldValue(null, "-1.0");
-		String n2 = nn.normalizeFieldValue(null, "1.0");
-
-		assertTrue( n1.compareTo(n2) < 0 );
-		
-	}
-	
-	
-	@Test
-	public void test3(){
-		NumberNormalizer nn = new NumberNormalizer();
-		String n1 = nn.normalizeFieldValue(null, "-0.0001");
-		String n2 = nn.normalizeFieldValue(null, "0");
-		String n3 = nn.normalizeFieldValue(null, "0.00001");
-
-		assertTrue((n1.compareTo(n2) < 0) && (n2.compareTo(n3) < 0));
-	}
-	
-	
-	
-	@Test
-	public void test4(){
-		NumberNormalizer nn = new NumberNormalizer();
-		String nn1 = nn.normalizeFieldValue(null, Integer.toString(Integer.MAX_VALUE));
-		String nn2 = nn.normalizeFieldValue(null, Integer.toString(Integer.MAX_VALUE-1));
-		
-		assertTrue( (nn2.compareTo(nn1) < 0));
-		
-	}
-	
-	
-	@Test
-	public void test5(){
-		NumberNormalizer nn = new NumberNormalizer();
-		String nn1 = nn.normalizeFieldValue(null, "-0.001");
-		String nn2 = nn.normalizeFieldValue(null, "-0.0009");
-		String nn3 = nn.normalizeFieldValue(null, "-0.00090");
-	
-		assertTrue((nn3.compareTo(nn2) == 0) && (nn2.compareTo(nn1) > 0));
-		
-	}
-	
-	@Test
-	public void test6(){
-		NumberNormalizer nn = new NumberNormalizer();
-		String nn1 = nn.normalizeFieldValue(null, "00.0");		
-		String nn2 = nn.normalizeFieldValue(null, "0");
-		String nn3 = nn.normalizeFieldValue(null, "0.0");
-	
-		
-		assertTrue((nn3.compareTo(nn2) == 0) && (nn2.compareTo(nn1) == 0));
-		
-	}
-	
+  
+  @Test
+  public void test1() throws Exception {
+    NumberNormalizer nn = new NumberNormalizer();
+    
+    String n1 = nn.normalizeFieldValue(null, "1");
+    String n2 = nn.normalizeFieldValue(null, "1.00000000");
+    
+    assertTrue(n1.compareTo(n2) < 0);
+    
+  }
+  
+  @Test
+  public void test2() {
+    NumberNormalizer nn = new NumberNormalizer();
+    
+    String n1 = nn.normalizeFieldValue(null, "-1.0");
+    String n2 = nn.normalizeFieldValue(null, "1.0");
+    
+    assertTrue(n1.compareTo(n2) < 0);
+    
+  }
+  
+  @Test
+  public void test3() {
+    NumberNormalizer nn = new NumberNormalizer();
+    String n1 = nn.normalizeFieldValue(null, "-0.0001");
+    String n2 = nn.normalizeFieldValue(null, "0");
+    String n3 = nn.normalizeFieldValue(null, "0.00001");
+    
+    assertTrue((n1.compareTo(n2) < 0) && (n2.compareTo(n3) < 0));
+  }
+  
+  @Test
+  public void test4() {
+    NumberNormalizer nn = new NumberNormalizer();
+    String nn1 = nn.normalizeFieldValue(null, Integer.toString(Integer.MAX_VALUE));
+    String nn2 = nn.normalizeFieldValue(null, Integer.toString(Integer.MAX_VALUE - 1));
+    
+    assertTrue((nn2.compareTo(nn1) < 0));
+    
+  }
+  
+  @Test
+  public void test5() {
+    NumberNormalizer nn = new NumberNormalizer();
+    String nn1 = nn.normalizeFieldValue(null, "-0.001");
+    String nn2 = nn.normalizeFieldValue(null, "-0.0009");
+    String nn3 = nn.normalizeFieldValue(null, "-0.00090");
+    
+    assertTrue((nn3.compareTo(nn2) == 0) && (nn2.compareTo(nn1) > 0));
+    
+  }
+  
+  @Test
+  public void test6() {
+    NumberNormalizer nn = new NumberNormalizer();
+    String nn1 = nn.normalizeFieldValue(null, "00.0");
+    String nn2 = nn.normalizeFieldValue(null, "0");
+    String nn3 = nn.normalizeFieldValue(null, "0.0");
+    
+    assertTrue((nn3.compareTo(nn2) == 0) && (nn2.compareTo(nn1) == 0));
+    
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package reader;
 
 import static org.junit.Assert.*;
@@ -43,293 +43,241 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 public class AggregatingRecordReaderTest {
-	
-	public static class MyErrorHandler implements ErrorHandler {
-
-		@Override
-		public void error(SAXParseException exception) throws SAXException {
-			//System.out.println(exception.getMessage());
-		}
-
-		@Override
-		public void fatalError(SAXParseException exception) throws SAXException {
-			//System.out.println(exception.getMessage());
-		}
-
-		@Override
-		public void warning(SAXParseException exception) throws SAXException {
-			//System.out.println(exception.getMessage());
-		}
-		
-	}
-
-	private static final String xml1 =  "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
-	                                    "<doc>\n" +
-										"  <a>A</a>\n" +
-										"  <b>B</b>\n" +
-										"</doc>\n" +
-										"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
-										"<doc>\n" +
-										"  <a>C</a>\n" +
-										"  <b>D</b>\n" +
-										"</doc>\n" +
-										"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
-										"<doc>\n" +
-										"  <a>E</a>\n" +
-										"  <b>F</b>\n" +
-										"</doc>\n";
-
-	private static final String xml2 =  "  <b>B</b>\n" +
-										"</doc>\n" +
-										"<doc>\n" +
-										"  <a>C</a>\n" +
-										"  <b>D</b>\n" +
-										"</doc>\n" +
-										"<doc>\n" +
-										"  <a>E</a>\n" +
-										"  <b>F</b>\n" +
-										"</doc>\n";
-
-	private static final String xml3 =  "<doc>\n" +
-										"  <a>A</a>\n" +
-										"  <b>B</b>\n" +
-										"</doc>\n" +
-										"<doc>\n" +
-										"  <a>C</a>\n" +
-										"  <b>D</b>\n" +
-										"</doc>\n" +
-										"<doc>\n" +
-										"  <a>E</a>\n";
-
-	private static final String xml4 =  "<doc>" +
-										"  <a>A</a>" +
-										"  <b>B</b>" +
-										"</doc>" +
-										"<doc>" +
-										"  <a>C</a>" +
-										"  <b>D</b>" +
-										"</doc>" +
-										"<doc>" +
-										"  <a>E</a>" +
-										"  <b>F</b>" +
-										"</doc>";
-	
-	private static final String xml5 =  "<doc attr=\"G\">" +
-										"  <a>A</a>" +
-										"  <b>B</b>" +
-										"</doc>" +
-										"<doc>" +
-										"  <a>C</a>" +
-										"  <b>D</b>" +
-										"</doc>" +
-										"<doc attr=\"H\"/>" +
-										"<doc>" +
-										"  <a>E</a>" +
-										"  <b>F</b>" +
-										"</doc>" +
-										"<doc attr=\"I\"/>";
-
-
-	private Configuration conf = null;
-	private TaskAttemptContext ctx = null;
-	private static DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-	private XPathFactory xpFactory = XPathFactory.newInstance();
-	private XPathExpression EXPR_A = null;
-	private XPathExpression EXPR_B = null;
-	private XPathExpression EXPR_ATTR = null;
-
-
-	@Before
-	public void setUp() throws Exception {
-		conf = new Configuration();
-		conf.set(AggregatingRecordReader.START_TOKEN, "<doc");
-		conf.set(AggregatingRecordReader.END_TOKEN, "</doc>");
-		conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true));
-		ctx = new TaskAttemptContext(conf, new TaskAttemptID());		
-		XPath xp = xpFactory.newXPath();
-		EXPR_A = xp.compile("/doc/a");
-		EXPR_B = xp.compile("/doc/b");
-		EXPR_ATTR = xp.compile("/doc/@attr");
-	}
-	
-	public File createFile (String data) throws Exception {
-		//Write out test file
-		File f = File.createTempFile("aggReaderTest", ".xml");
-		f.deleteOnExit();
-		FileWriter writer = new FileWriter(f);
-		writer.write(data);
-		writer.flush();
-		writer.close();
-		return f;
-	}
-	
-	private void testXML(Text xml, String aValue, String bValue, String attrValue) throws Exception {
-		StringReader reader = new StringReader(xml.toString());
-		InputSource source = new InputSource(reader);
-
-		DocumentBuilder parser = factory.newDocumentBuilder();
-		parser.setErrorHandler(new MyErrorHandler());
-		Document root = parser.parse(source);
-		assertNotNull(root);
-		
-		reader = new StringReader(xml.toString());
-		source = new InputSource(reader);
-		assertEquals(EXPR_A.evaluate(source), aValue);
-
-		reader = new StringReader(xml.toString());
-		source = new InputSource(reader);
-		assertEquals(EXPR_B.evaluate(source), bValue);
-		
-		reader = new StringReader(xml.toString());
-		source = new InputSource(reader);
-		assertEquals(EXPR_ATTR.evaluate(source), attrValue);
-	}
-	
-	@Test
-	public void testIncorrectArgs() throws Exception {
-		File f = createFile(xml1);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		try {
-			//Clear the values for BEGIN and STOP TOKEN
-			conf.set(AggregatingRecordReader.START_TOKEN, null);
-			conf.set(AggregatingRecordReader.END_TOKEN, null);
-			reader.initialize(split, ctx);
-			//If we got here, then the code didnt throw an exception
-			fail();
-		} catch (Exception e) {
-			//Do nothing, we succeeded
-			f = null;
-		}
-		reader.close();
-	}
-	
-	@Test
-	public void testCorrectXML() throws Exception {
-		File f = createFile(xml1);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "A", "B", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "E", "F", "");
-		assertTrue(!reader.nextKeyValue());
-		
-	}
-
-	@Test
-	public void testPartialXML() throws Exception {
-		File f = createFile(xml2);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "E", "F", "");
-		assertTrue(!reader.nextKeyValue());
-	}
-	
-	public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
-		conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
-		File f = createFile(xml3);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "A", "B", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue(!reader.nextKeyValue());
-	}
-
-	@Test
-	public void testPartialXML2() throws Exception {
-		File f = createFile(xml3);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "A", "B", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue(reader.nextKeyValue());
-		try {
-			testXML(reader.getCurrentValue(), "E", "", "");
-			fail("Fragment returned, and it somehow passed XML parsing.");
-		} catch (SAXParseException e) {
-			// ignore
-		}
-		assertTrue(!reader.nextKeyValue());
-	}
-
-	@Test
-	public void testLineSplitting() throws Exception {
-		File f = createFile(xml4);
-		
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "A", "B", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue(reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "E", "F", "");
-		assertTrue(!reader.nextKeyValue());
-	}
-	
-	@Test
-	public void testNoEndTokenHandling() throws Exception {
-		File f = createFile(xml5);
-		//Create FileSplit
-		Path p = new Path(f.toURI().toString());
-		FileSplit split = new FileSplit(p, 0, f.length(), null);
-		
-		//Initialize the RecordReader
-		AggregatingRecordReader reader = new AggregatingRecordReader();
-		reader.initialize(split, ctx);
-		assertTrue("Not enough records returned.", reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "A", "B", "G");
-		assertTrue("Not enough records returned.", reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "C", "D", "");
-		assertTrue("Not enough records returned.", reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "", "", "H");
-		assertTrue("Not enough records returned.", reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "E", "F", "");
-		assertTrue("Not enough records returned.", reader.nextKeyValue());
-		testXML(reader.getCurrentValue(), "", "", "I");
-		assertTrue("Too many records returned.", !reader.nextKeyValue());
-	}
-
+  
+  public static class MyErrorHandler implements ErrorHandler {
+    
+    @Override
+    public void error(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void fatalError(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void warning(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+  }
+  
+  private static final String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n"
+      + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+      + "<doc>\n" + "  <a>E</a>\n" + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml2 = "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>E</a>\n"
+      + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml3 = "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n"
+      + "<doc>\n" + "  <a>E</a>\n";
+  
+  private static final String xml4 = "<doc>" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>" + "<doc>"
+      + "  <a>E</a>" + "  <b>F</b>" + "</doc>";
+  
+  private static final String xml5 = "<doc attr=\"G\">" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>"
+      + "<doc attr=\"H\"/>" + "<doc>" + "  <a>E</a>" + "  <b>F</b>" + "</doc>" + "<doc attr=\"I\"/>";
+  
+  private Configuration conf = null;
+  private TaskAttemptContext ctx = null;
+  private static DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+  private XPathFactory xpFactory = XPathFactory.newInstance();
+  private XPathExpression EXPR_A = null;
+  private XPathExpression EXPR_B = null;
+  private XPathExpression EXPR_ATTR = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(AggregatingRecordReader.START_TOKEN, "<doc");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</doc>");
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true));
+    ctx = new TaskAttemptContext(conf, new TaskAttemptID());
+    XPath xp = xpFactory.newXPath();
+    EXPR_A = xp.compile("/doc/a");
+    EXPR_B = xp.compile("/doc/b");
+    EXPR_ATTR = xp.compile("/doc/@attr");
+  }
+  
+  public File createFile(String data) throws Exception {
+    // Write out test file
+    File f = File.createTempFile("aggReaderTest", ".xml");
+    f.deleteOnExit();
+    FileWriter writer = new FileWriter(f);
+    writer.write(data);
+    writer.flush();
+    writer.close();
+    return f;
+  }
+  
+  private void testXML(Text xml, String aValue, String bValue, String attrValue) throws Exception {
+    StringReader reader = new StringReader(xml.toString());
+    InputSource source = new InputSource(reader);
+    
+    DocumentBuilder parser = factory.newDocumentBuilder();
+    parser.setErrorHandler(new MyErrorHandler());
+    Document root = parser.parse(source);
+    assertNotNull(root);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_A.evaluate(source), aValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_B.evaluate(source), bValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_ATTR.evaluate(source), attrValue);
+  }
+  
+  @Test
+  public void testIncorrectArgs() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    try {
+      // Clear the values for BEGIN and STOP TOKEN
+      conf.set(AggregatingRecordReader.START_TOKEN, null);
+      conf.set(AggregatingRecordReader.END_TOKEN, null);
+      reader.initialize(split, ctx);
+      // If we got here, then the code didnt throw an exception
+      fail();
+    } catch (Exception e) {
+      // Do nothing, we succeeded
+      f = null;
+    }
+    reader.close();
+  }
+  
+  @Test
+  public void testCorrectXML() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+    
+  }
+  
+  @Test
+  public void testPartialXML() throws Exception {
+    File f = createFile(xml2);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testPartialXML2() throws Exception {
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    try {
+      testXML(reader.getCurrentValue(), "E", "", "");
+      fail("Fragment returned, and it somehow passed XML parsing.");
+    } catch (SAXParseException e) {
+      // ignore
+    }
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testLineSplitting() throws Exception {
+    File f = createFile(xml4);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testNoEndTokenHandling() throws Exception {
+    File f = createFile(xml5);
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "G");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "H");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "I");
+    assertTrue("Too many records returned.", !reader.nextKeyValue());
+  }
+  
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml Mon Dec  5 20:05:49 2011
@@ -1,13 +1,13 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
+  contributor license agreements. See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+  the License. You may obtain a copy of the License at
 
-      http://www.apache.org/licenses/LICENSE-2.0
+  http://www.apache.org/licenses/LICENSE-2.0
 
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,8 +15,7 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <modelVersion>4.0.0</modelVersion>
 
@@ -275,9 +274,9 @@
         <version>${version.jboss-logging}</version>
       </dependency>
       <dependency>
-      	<groupId>com.googlecode</groupId>
-      	<artifactId>minlog</artifactId>
-      	<version>${version.minlog}</version>
+        <groupId>com.googlecode</groupId>
+        <artifactId>minlog</artifactId>
+        <version>${version.minlog}</version>
       </dependency>
     </dependencies>
   </dependencyManagement>

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml Mon Dec  5 20:05:49 2011
@@ -1,13 +1,13 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
+  contributor license agreements. See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+  the License. You may obtain a copy of the License at
 
-      http://www.apache.org/licenses/LICENSE-2.0
+  http://www.apache.org/licenses/LICENSE-2.0
 
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml Mon Dec  5 20:05:49 2011
@@ -1,13 +1,13 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
+  contributor license agreements. See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+  the License. You may obtain a copy of the License at
 
-      http://www.apache.org/licenses/LICENSE-2.0
+  http://www.apache.org/licenses/LICENSE-2.0
 
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java Mon Dec  5 20:05:49 2011
@@ -1,68 +1,68 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package function;
 
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.log4j.Logger;
 
 public class QueryFunctions {
-	
-	protected static Logger log = Logger.getLogger(QueryFunctions.class);
-
-	public static boolean between (String fieldValue, double left, double right) {
-		try {
-			Double value = Double.parseDouble(fieldValue);
-			if (value >= left && value <= right)
-				return true;
-			return false;
-		} catch (NumberFormatException nfe) {
-			return false;
-		}
-	}
-
-	public static boolean between (String fieldValue, long left, long right) {
-		try {
-			Long value = Long.parseLong(fieldValue);
-			if (value >= left && value <= right)
-				return true;
-			return false;
-		} catch (NumberFormatException nfe) {
-			return false;
-		}
-	}
-	
-	public static Number abs (String fieldValue) {
-		Number retval = null;
-		try {
-			Number value = NumberUtils.createNumber(fieldValue);
-			if (null == value)
-				retval = (Number) Integer.MIN_VALUE;
-			else if (value instanceof Long)
-				retval = Math.abs(value.longValue());
-			else if (value instanceof Double)
-				retval = Math.abs(value.doubleValue());
-			else if (value instanceof Float)
-				retval = Math.abs(value.floatValue());
-			else if (value instanceof Integer)
-				retval = Math.abs(value.intValue());
-		} catch (NumberFormatException nfe) {
-			return (Number) Integer.MIN_VALUE;
-		}
-		return retval;
-	}
-	
+  
+  protected static Logger log = Logger.getLogger(QueryFunctions.class);
+  
+  public static boolean between(String fieldValue, double left, double right) {
+    try {
+      Double value = Double.parseDouble(fieldValue);
+      if (value >= left && value <= right)
+        return true;
+      return false;
+    } catch (NumberFormatException nfe) {
+      return false;
+    }
+  }
+  
+  public static boolean between(String fieldValue, long left, long right) {
+    try {
+      Long value = Long.parseLong(fieldValue);
+      if (value >= left && value <= right)
+        return true;
+      return false;
+    } catch (NumberFormatException nfe) {
+      return false;
+    }
+  }
+  
+  public static Number abs(String fieldValue) {
+    Number retval = null;
+    try {
+      Number value = NumberUtils.createNumber(fieldValue);
+      if (null == value)
+        retval = (Number) Integer.MIN_VALUE;
+      else if (value instanceof Long)
+        retval = Math.abs(value.longValue());
+      else if (value instanceof Double)
+        retval = Math.abs(value.doubleValue());
+      else if (value instanceof Float)
+        retval = Math.abs(value.floatValue());
+      else if (value instanceof Integer)
+        retval = Math.abs(value.intValue());
+    } catch (NumberFormatException nfe) {
+      return (Number) Integer.MIN_VALUE;
+    }
+    return retval;
+  }
+  
 }