You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/12 17:06:20 UTC
svn commit: r1230608 [2/16] - in /incubator/accumulo/trunk: ./
contrib/accumulo_sample/ src/assemble/ src/core/
src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/
src/core/src/main/java/org/apache/accumulo/core/master/thrift/ src/core/...
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/combiner/StatsCombiner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/combiner/StatsCombiner.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/combiner/StatsCombiner.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/combiner/StatsCombiner.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.combiner;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ *
+ */
+public class StatsCombiner extends Combiner {
+
+ public static final String RADIX_OPTION = "radix";
+
+ private int radix = 10;
+
+ @Override
+ public Value reduce(Key key, Iterator<Value> iter) {
+
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ long sum = 0;
+ long count = 0;
+
+ while (iter.hasNext()) {
+ String stats[] = iter.next().toString().split(",");
+
+ if (stats.length == 1) {
+ long val = Long.parseLong(stats[0], radix);
+ min = Math.min(val, min);
+ max = Math.max(val, max);
+ sum += val;
+ count += 1;
+ } else {
+ min = Math.min(Long.parseLong(stats[0], radix), min);
+ max = Math.max(Long.parseLong(stats[1], radix), max);
+ sum += Long.parseLong(stats[2], radix);
+ count += Long.parseLong(stats[3], radix);
+ }
+ }
+
+ String ret = Long.toString(min, radix) + "," + Long.toString(max, radix) + "," + Long.toString(sum, radix) + "," + Long.toString(count, radix);
+ return new Value(ret.getBytes());
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ if (options.containsKey(RADIX_OPTION))
+ radix = Integer.parseInt(options.get(RADIX_OPTION));
+ else
+ radix = 10;
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ IteratorOptions io = super.describeOptions();
+ io.setName("statsCombiner");
+ io.setDescription("Combiner that keeps track of min, max, sum, and count");
+ io.addNamedOption(RADIX_OPTION, "radix/base of the numbers");
+ return io;
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ if (!super.validateOptions(options))
+ return false;
+
+ if (options.containsKey(RADIX_OPTION) && !options.get(RADIX_OPTION).matches("\\d+"))
+ return false;
+
+ return true;
+ }
+
+ /**
+ * A convenience method for setting the expected base/radix of the numbers
+ *
+ * @param iterConfig
+ * Iterator settings to configure
+ * @param maxSize
+ * Maximum set size
+ */
+ public static void setRadix(IteratorSetting iterConfig, int base) {
+ iterConfig.addOption(RADIX_OPTION, base + "");
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/combiner/StatsCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/AlphaNumKeyConstraint.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/AlphaNumKeyConstraint.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/AlphaNumKeyConstraint.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/AlphaNumKeyConstraint.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.constraints;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+
+/**
+ * This class is an accumulo constraint that ensures all fields of a key are alpha numeric.
+ *
+ *
+ *
+ */
+
+public class AlphaNumKeyConstraint implements Constraint {
+
+ private static final short NON_ALPHA_NUM_ROW = 1;
+ private static final short NON_ALPHA_NUM_COLF = 2;
+ private static final short NON_ALPHA_NUM_COLQ = 3;
+
+ private boolean isAlphaNum(byte bytes[]) {
+ for (byte b : bytes) {
+ boolean ok = ((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9'));
+ if (!ok)
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<Short> addViolation(List<Short> violations, short violation) {
+ if (violations == null) {
+ violations = new ArrayList<Short>();
+ violations.add(violation);
+ } else if (!violations.contains(violation)) {
+ violations.add(violation);
+ }
+ return violations;
+ }
+
+ @Override
+ public List<Short> check(Environment env, Mutation mutation) {
+ List<Short> violations = null;
+
+ if (!isAlphaNum(mutation.getRow()))
+ violations = addViolation(violations, NON_ALPHA_NUM_ROW);
+
+ Collection<ColumnUpdate> updates = mutation.getUpdates();
+ for (ColumnUpdate columnUpdate : updates) {
+ if (!isAlphaNum(columnUpdate.getColumnFamily()))
+ violations = addViolation(violations, NON_ALPHA_NUM_COLF);
+
+ if (!isAlphaNum(columnUpdate.getColumnQualifier()))
+ violations = addViolation(violations, NON_ALPHA_NUM_COLQ);
+ }
+
+ return violations;
+ }
+
+ @Override
+ public String getViolationDescription(short violationCode) {
+
+ switch (violationCode) {
+ case NON_ALPHA_NUM_ROW:
+ return "Row was not alpha numeric";
+ case NON_ALPHA_NUM_COLF:
+ return "Column family was not alpha numeric";
+ case NON_ALPHA_NUM_COLQ:
+ return "Column qualifier was not alpha numeric";
+ }
+
+ return null;
+ }
+
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/AlphaNumKeyConstraint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/MaxMutationSize.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/MaxMutationSize.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/MaxMutationSize.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/MaxMutationSize.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.constraints;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.Mutation;
+
+/**
+ * Ensure that mutations are a reasonable size: we must be able to fit several in memory at a time.
+ *
+ */
+public class MaxMutationSize implements Constraint {
+ static final long MAX_SIZE = Runtime.getRuntime().maxMemory() >> 8;
+ static final List<Short> empty = Collections.emptyList();
+ static final List<Short> violations = Collections.singletonList(new Short((short) 0));
+
+ @Override
+ public String getViolationDescription(short violationCode) {
+ return String.format("mutation exceeded maximum size of %d", MAX_SIZE);
+ }
+
+ @Override
+ public List<Short> check(Environment env, Mutation mutation) {
+ if (mutation.estimatedMemoryUsed() < MAX_SIZE)
+ return empty;
+ return violations;
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/MaxMutationSize.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/NumericValueConstraint.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/NumericValueConstraint.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/NumericValueConstraint.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/NumericValueConstraint.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.constraints;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+
+public class NumericValueConstraint implements Constraint {
+
+ private static final short NON_NUMERIC_VALUE = 1;
+
+ private boolean isNumeric(byte bytes[]) {
+ for (byte b : bytes) {
+ boolean ok = (b >= '0' && b <= '9');
+ if (!ok)
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<Short> addViolation(List<Short> violations, short violation) {
+ if (violations == null) {
+ violations = new ArrayList<Short>();
+ violations.add(violation);
+ } else if (!violations.contains(violation)) {
+ violations.add(violation);
+ }
+ return violations;
+ }
+
+ @Override
+ public List<Short> check(Environment env, Mutation mutation) {
+ List<Short> violations = null;
+
+ Collection<ColumnUpdate> updates = mutation.getUpdates();
+
+ for (ColumnUpdate columnUpdate : updates) {
+ if (!isNumeric(columnUpdate.getValue()))
+ violations = addViolation(violations, NON_NUMERIC_VALUE);
+ }
+
+ return violations;
+ }
+
+ @Override
+ public String getViolationDescription(short violationCode) {
+
+ switch (violationCode) {
+ case NON_NUMERIC_VALUE:
+ return "Value is not numeric";
+ }
+
+ return null;
+ }
+
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/constraints/NumericValueConstraint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class FileCount {
+
+ private int entriesScanned;
+ private int inserts;
+
+ private Connector conn;
+ private Authorizations auths;
+ private ColumnVisibility colvis;
+ private String table;
+
+ private static class CountValue {
+ int dirCount = 0;
+ int fileCount = 0;
+ int recursiveDirCount = 0;
+ int recusiveFileCount = 0;
+
+ void set(Value val) {
+ String sa[] = val.toString().split(",");
+ dirCount = Integer.parseInt(sa[0]);
+ fileCount = Integer.parseInt(sa[1]);
+ recursiveDirCount = Integer.parseInt(sa[2]);
+ recusiveFileCount = Integer.parseInt(sa[3]);
+ }
+
+ Value toValue() {
+ return new Value((dirCount + "," + fileCount + "," + recursiveDirCount + "," + recusiveFileCount).getBytes());
+ }
+
+ void incrementFiles() {
+ fileCount++;
+ recusiveFileCount++;
+ }
+
+ void incrementDirs() {
+ dirCount++;
+ recursiveDirCount++;
+ }
+
+ public void clear() {
+ dirCount = 0;
+ fileCount = 0;
+ recursiveDirCount = 0;
+ recusiveFileCount = 0;
+ }
+
+ public void incrementRecursive(CountValue other) {
+ recursiveDirCount += other.recursiveDirCount;
+ recusiveFileCount += other.recusiveFileCount;
+ }
+ }
+
+ private int findMaxDepth(Scanner scanner, int min, int max) {
+ int mid = min + (max - min) / 2;
+ return findMaxDepth(scanner, min, mid, max);
+ }
+
+ private int findMaxDepth(Scanner scanner, int min, int mid, int max) {
+ // check to see if the mid point exist
+ if (max < min)
+ return -1;
+
+ scanner.setRange(new Range(String.format("%03d", mid), true, String.format("%03d", mid + 1), false));
+
+ if (scanner.iterator().hasNext()) {
+ // this depth exist, check to see if a larger depth exist
+ int ret = findMaxDepth(scanner, mid + 1, max);
+ if (ret == -1)
+ return mid; // this must the max
+ else
+ return ret;
+ } else {
+ // this depth does not exist, look lower
+ return findMaxDepth(scanner, min, mid - 1);
+ }
+
+ }
+
+ private int findMaxDepth(Scanner scanner) {
+ // do binary search to find max depth
+ int origBatchSize = scanner.getBatchSize();
+ scanner.setBatchSize(100);
+ int depth = findMaxDepth(scanner, 0, 64, 999);
+ scanner.setBatchSize(origBatchSize);
+ return depth;
+ }
+
+ // find the count column and consume a row
+ private Entry<Key,Value> findCount(Entry<Key,Value> entry, Iterator<Entry<Key,Value>> iterator, CountValue cv) {
+
+ Key key = entry.getKey();
+ Text currentRow = key.getRow();
+
+ if (key.compareColumnQualifier(QueryUtil.COUNTS_COLQ) == 0)
+ cv.set(entry.getValue());
+
+ while (iterator.hasNext()) {
+ entry = iterator.next();
+ entriesScanned++;
+ key = entry.getKey();
+
+ if (key.compareRow(currentRow) != 0)
+ return entry;
+
+ if (key.compareColumnFamily(QueryUtil.DIR_COLF) == 0 && key.compareColumnQualifier(QueryUtil.COUNTS_COLQ) == 0) {
+ cv.set(entry.getValue());
+ }
+
+ }
+
+ return null;
+ }
+
+ private Entry<Key,Value> consumeRow(Entry<Key,Value> entry, Iterator<Entry<Key,Value>> iterator) {
+ Key key = entry.getKey();
+ Text currentRow = key.getRow();
+
+ while (iterator.hasNext()) {
+ entry = iterator.next();
+ entriesScanned++;
+ key = entry.getKey();
+
+ if (key.compareRow(currentRow) != 0)
+ return entry;
+ }
+
+ return null;
+ }
+
+ private String extractDir(Key key) {
+ String row = key.getRowData().toString();
+ return row.substring(3, row.lastIndexOf('/'));
+ }
+
+ private Mutation createMutation(int depth, String dir, CountValue countVal) {
+ Mutation m = new Mutation(String.format("%03d%s", depth, dir));
+ m.put(QueryUtil.DIR_COLF, QueryUtil.COUNTS_COLQ, colvis, countVal.toValue());
+ return m;
+ }
+
+ private void calculateCounts(Scanner scanner, int depth, BatchWriter batchWriter) throws Exception {
+
+ scanner.setRange(new Range(String.format("%03d", depth), true, String.format("%03d", depth + 1), false));
+
+ CountValue countVal = new CountValue();
+
+ Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+
+ String currentDir = null;
+
+ Entry<Key,Value> entry = null;
+ if (iterator.hasNext()) {
+ entry = iterator.next();
+ entriesScanned++;
+ }
+
+ while (entry != null) {
+ Key key = entry.getKey();
+
+ String dir = extractDir(key);
+
+ if (currentDir == null) {
+ currentDir = dir;
+ } else if (!currentDir.equals(dir)) {
+ batchWriter.addMutation(createMutation(depth - 1, currentDir, countVal));
+ inserts++;
+
+ currentDir = dir;
+ countVal.clear();
+ }
+
+ // process a whole row
+ if (key.compareColumnFamily(QueryUtil.DIR_COLF) == 0) {
+ CountValue tmpCount = new CountValue();
+ entry = findCount(entry, iterator, tmpCount);
+
+ if (tmpCount.dirCount == 0 && tmpCount.fileCount == 0) {
+ // in this case the higher depth will not insert anything if the
+ // dir has no children, so insert something here
+ Mutation m = new Mutation(key.getRow());
+ m.put(QueryUtil.DIR_COLF, QueryUtil.COUNTS_COLQ, colvis, tmpCount.toValue());
+ batchWriter.addMutation(m);
+ inserts++;
+ }
+
+ countVal.incrementRecursive(tmpCount);
+ countVal.incrementDirs();
+ } else {
+ entry = consumeRow(entry, iterator);
+ countVal.incrementFiles();
+ }
+ }
+
+ if (currentDir != null) {
+ batchWriter.addMutation(createMutation(depth - 1, currentDir, countVal));
+ inserts++;
+ }
+ }
+
+ FileCount(String instance, String zookeepers, String user, String password, String table, String auths, String colvis, boolean mock) throws Exception {
+ Instance inst;
+ if (mock) {
+ inst = new MockInstance(instance);
+ } else {
+ inst = new ZooKeeperInstance(instance, zookeepers);
+ }
+ this.conn = inst.getConnector(user, password);
+ if (auths.length() > 0)
+ this.auths = new Authorizations(auths.split(","));
+ else
+ this.auths = new Authorizations();
+ this.colvis = new ColumnVisibility(colvis);
+ this.table = table;
+ }
+
+ public void run() throws Exception {
+
+ entriesScanned = 0;
+ inserts = 0;
+
+ Scanner scanner = conn.createScanner(table, auths);
+ BatchWriter bw = conn.createBatchWriter(table, 10000000, 60000l, 3);
+
+ long t1 = System.currentTimeMillis();
+
+ int depth = findMaxDepth(scanner);
+
+ long t2 = System.currentTimeMillis();
+
+ for (int d = depth; d > 0; d--) {
+ calculateCounts(scanner, d, bw);
+ // must flush so next depth can read what prev depth wrote
+ bw.flush();
+ }
+
+ bw.close();
+
+ long t3 = System.currentTimeMillis();
+
+ System.out.printf("Max depth : %d\n", depth);
+ System.out.printf("Time to find max depth : %,d ms\n", (t2 - t1));
+ System.out.printf("Time to compute counts : %,d ms\n", (t3 - t2));
+ System.out.printf("Entries scanned : %,d \n", entriesScanned);
+ System.out.printf("Counts inserted : %,d \n", inserts);
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 7) {
+ System.out.println("usage: " + FileCount.class.getSimpleName() + " <instance> <zookeepers> <user> <pass> <table> <auths> <colvis>");
+ System.exit(1);
+ }
+
+ FileCount fileCount = new FileCount(args[0], args[1], args[2], args[3], args[4], args[5], args[6], false);
+
+ fileCount.run();
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner;
+import org.apache.accumulo.core.iterators.TypedValueCombiner.Encoder;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+import org.apache.hadoop.io.Text;
+
+public class Ingest {
+ static final Value nullValue = new Value(new byte[0]);
+ public static final String LENGTH_CQ = "length";
+ public static final String HIDDEN_CQ = "hidden";
+ public static final String EXEC_CQ = "exec";
+ public static final String LASTMOD_CQ = "lastmod";
+ public static final String HASH_CQ = "md5";
+ public static final Encoder<Long> encoder = LongCombiner.FIXED_LEN_ENCODER;
+
+ public static Mutation buildMutation(ColumnVisibility cv, String path, boolean isDir, boolean isHidden, boolean canExec, long length, long lastmod,
+ String hash) {
+ if (path.equals("/"))
+ path = "";
+ Mutation m = new Mutation(QueryUtil.getRow(path));
+ Text colf = null;
+ if (isDir)
+ colf = QueryUtil.DIR_COLF;
+ else
+ colf = new Text(encoder.encode(Long.MAX_VALUE - lastmod));
+ m.put(colf, new Text(LENGTH_CQ), cv, new Value(Long.toString(length).getBytes()));
+ m.put(colf, new Text(HIDDEN_CQ), cv, new Value(Boolean.toString(isHidden).getBytes()));
+ m.put(colf, new Text(EXEC_CQ), cv, new Value(Boolean.toString(canExec).getBytes()));
+ m.put(colf, new Text(LASTMOD_CQ), cv, new Value(Long.toString(lastmod).getBytes()));
+ if (hash != null && hash.length() > 0)
+ m.put(colf, new Text(HASH_CQ), cv, new Value(hash.getBytes()));
+ return m;
+ }
+
+ private static void ingest(File src, ColumnVisibility cv, BatchWriter dirBW, BatchWriter indexBW, FileDataIngest fdi, BatchWriter data) throws Exception {
+ // build main table entry
+ String path = null;
+ try {
+ path = src.getCanonicalPath();
+ } catch (IOException e) {
+ path = src.getAbsolutePath();
+ }
+ System.out.println(path);
+
+ String hash = null;
+ if (!src.isDirectory()) {
+ try {
+ hash = fdi.insertFileData(path, data);
+ } catch (Exception e) {
+ // if something goes wrong, just skip this one
+ return;
+ }
+ }
+
+ dirBW.addMutation(buildMutation(cv, path, src.isDirectory(), src.isHidden(), src.canExecute(), src.length(), src.lastModified(), hash));
+
+ // build index table entries
+ Text row = QueryUtil.getForwardIndex(path);
+ if (row != null) {
+ Text p = new Text(QueryUtil.getRow(path));
+ Mutation m = new Mutation(row);
+ m.put(QueryUtil.INDEX_COLF, p, cv, nullValue);
+ indexBW.addMutation(m);
+
+ row = QueryUtil.getReverseIndex(path);
+ m = new Mutation(row);
+ m.put(QueryUtil.INDEX_COLF, p, cv, nullValue);
+ indexBW.addMutation(m);
+ }
+ }
+
+ private static void recurse(File src, ColumnVisibility cv, BatchWriter dirBW, BatchWriter indexBW, FileDataIngest fdi, BatchWriter data) throws Exception {
+ // ingest this File
+ ingest(src, cv, dirBW, indexBW, fdi, data);
+ // recurse into subdirectories
+ if (src.isDirectory()) {
+ File[] files = src.listFiles();
+ if (files == null)
+ return;
+ for (File child : files) {
+ recurse(child, cv, dirBW, indexBW, fdi, data);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 10) {
+ System.out.println("usage: " + Ingest.class.getSimpleName()
+ + " <instance> <zoo> <user> <pass> <dir table> <index table> <data table> <visibility> <data chunk size> <dir>{ <dir>}");
+ System.exit(1);
+ }
+
+ String instance = args[0];
+ String zooKeepers = args[1];
+ String user = args[2];
+ String pass = args[3];
+ String nameTable = args[4];
+ String indexTable = args[5];
+ String dataTable = args[6];
+ ColumnVisibility colvis = new ColumnVisibility(args[7]);
+ int chunkSize = Integer.parseInt(args[8]);
+
+ Connector conn = new ZooKeeperInstance(instance, zooKeepers).getConnector(user, pass.getBytes());
+ if (!conn.tableOperations().exists(nameTable))
+ conn.tableOperations().create(nameTable);
+ if (!conn.tableOperations().exists(indexTable))
+ conn.tableOperations().create(indexTable);
+ if (!conn.tableOperations().exists(dataTable))
+ conn.tableOperations().create(dataTable);
+
+ BatchWriter dirBW = conn.createBatchWriter(nameTable, 50000000, 300000l, 4);
+ BatchWriter indexBW = conn.createBatchWriter(indexTable, 50000000, 300000l, 4);
+ BatchWriter dataBW = conn.createBatchWriter(dataTable, 50000000, 300000l, 4);
+ FileDataIngest fdi = new FileDataIngest(chunkSize, colvis);
+ for (int i = 9; i < args.length; i++) {
+ recurse(new File(args[i]), colvis, dirBW, indexBW, fdi, dataBW);
+
+ // fill in parent directory info
+ String file = args[i];
+ int slashIndex = -1;
+ while ((slashIndex = file.lastIndexOf("/")) > 0) {
+ file = file.substring(0, slashIndex);
+ ingest(new File(file), colvis, dirBW, indexBW, fdi, dataBW);
+ }
+ }
+ ingest(new File("/"), colvis, dirBW, indexBW, fdi, dataBW);
+
+ dirBW.close();
+ indexBW.close();
+ dataBW.close();
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/QueryUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/QueryUtil.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/QueryUtil.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/QueryUtil.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+public class QueryUtil {
+ private Connector conn = null;
+ private String tableName;
+ private Authorizations auths;
+ public static final Text DIR_COLF = new Text("dir");
+ public static final Text FORWARD_PREFIX = new Text("f");
+ public static final Text REVERSE_PREFIX = new Text("r");
+ public static final Text INDEX_COLF = new Text("i");
+ public static final Text COUNTS_COLQ = new Text("counts");
+
+ public QueryUtil(String instanceName, String zooKeepers, String user, String password, String tableName, Authorizations auths) throws AccumuloException,
+ AccumuloSecurityException {
+ ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
+ conn = instance.getConnector(user, password.getBytes());
+ this.tableName = tableName;
+ this.auths = auths;
+ }
+
+ public static int getDepth(String path) {
+ int numSlashes = 0;
+ int index = -1;
+ while ((index = path.indexOf("/", index + 1)) >= 0)
+ numSlashes++;
+ return numSlashes;
+ }
+
+ public static Text getRow(String path) {
+ Text row = new Text(String.format("%03d", getDepth(path)));
+ row.append(path.getBytes(), 0, path.length());
+ return row;
+ }
+
+ public static Text getForwardIndex(String path) {
+ String part = path.substring(path.lastIndexOf("/") + 1);
+ if (part.length() == 0)
+ return null;
+ Text row = new Text(FORWARD_PREFIX);
+ row.append(part.getBytes(), 0, part.length());
+ return row;
+ }
+
+ public static Text getReverseIndex(String path) {
+ String part = path.substring(path.lastIndexOf("/") + 1);
+ if (part.length() == 0)
+ return null;
+ byte[] rev = new byte[part.length()];
+ int i = part.length() - 1;
+ for (byte b : part.getBytes())
+ rev[i--] = b;
+ Text row = new Text(REVERSE_PREFIX);
+ row.append(rev, 0, rev.length);
+ return row;
+ }
+
+ public static String getType(Text colf) {
+ if (colf.equals(DIR_COLF))
+ return colf.toString() + ":";
+ return Long.toString(Ingest.encoder.decode(colf.getBytes())) + ":";
+ }
+
+ public Map<String,String> getData(String path) throws TableNotFoundException {
+ if (path.endsWith("/"))
+ path = path.substring(0, path.length() - 1);
+ Scanner scanner = conn.createScanner(tableName, auths);
+ scanner.setRange(new Range(getRow(path)));
+ Map<String,String> data = new TreeMap<String,String>();
+ for (Entry<Key,Value> e : scanner) {
+ String type = getType(e.getKey().getColumnFamily());
+ data.put("fullname", e.getKey().getRow().toString().substring(3));
+ data.put(type + e.getKey().getColumnQualifier().toString() + ":" + e.getKey().getColumnVisibility().toString(), new String(e.getValue().get()));
+ }
+ return data;
+ }
+
+ public Map<String,Map<String,String>> getDirList(String path) throws TableNotFoundException {
+ if (!path.endsWith("/"))
+ path = path + "/";
+ Map<String,Map<String,String>> fim = new TreeMap<String,Map<String,String>>();
+ Scanner scanner = conn.createScanner(tableName, auths);
+ scanner.setRange(Range.prefix(getRow(path)));
+ for (Entry<Key,Value> e : scanner) {
+ String name = e.getKey().getRow().toString();
+ name = name.substring(name.lastIndexOf("/") + 1);
+ String type = getType(e.getKey().getColumnFamily());
+ if (!fim.containsKey(name)) {
+ fim.put(name, new TreeMap<String,String>());
+ fim.get(name).put("fullname", e.getKey().getRow().toString().substring(3));
+ }
+ fim.get(name).put(type + e.getKey().getColumnQualifier().toString() + ":" + e.getKey().getColumnVisibility().toString(), new String(e.getValue().get()));
+ }
+ return fim;
+ }
+
+ public Iterable<Entry<Key,Value>> exactTermSearch(String term) throws Exception {
+ System.out.println("executing exactTermSearch for " + term);
+ Scanner scanner = conn.createScanner(tableName, auths);
+ scanner.setRange(new Range(getForwardIndex(term)));
+ return scanner;
+ }
+
+ public Iterable<Entry<Key,Value>> singleRestrictedWildCardSearch(String exp) throws Exception {
+ if (exp.indexOf("/") >= 0)
+ throw new Exception("this method only works with unqualified names");
+
+ Scanner scanner = conn.createScanner(tableName, auths);
+ if (exp.startsWith("*")) {
+ System.out.println("executing beginning wildcard search for " + exp);
+ exp = exp.substring(1);
+ scanner.setRange(Range.prefix(getReverseIndex(exp)));
+ } else if (exp.endsWith("*")) {
+ System.out.println("executing ending wildcard search for " + exp);
+ exp = exp.substring(0, exp.length() - 1);
+ scanner.setRange(Range.prefix(getForwardIndex(exp)));
+ } else if (exp.indexOf("*") >= 0) {
+ throw new Exception("this method only works for beginning or ending wild cards");
+ } else {
+ return exactTermSearch(exp);
+ }
+ return scanner;
+ }
+
+ public Iterable<Entry<Key,Value>> singleWildCardSearch(String exp) throws Exception {
+ int starIndex = exp.indexOf("*");
+ if (exp.indexOf("*", starIndex + 1) >= 0)
+ throw new Exception("only one wild card for search");
+
+ if (starIndex < 0) {
+ return exactTermSearch(exp);
+ } else if (starIndex == 0 || starIndex == exp.length() - 1) {
+ return singleRestrictedWildCardSearch(exp);
+ }
+
+ String firstPart = exp.substring(0, starIndex);
+ String lastPart = exp.substring(starIndex + 1);
+ String regexString = ".*/" + exp.replace("*", "[^/]*");
+
+ Scanner scanner = conn.createScanner(tableName, auths);
+ if (firstPart.length() >= lastPart.length()) {
+ System.out.println("executing middle wildcard search for " + regexString + " from entries starting with " + firstPart);
+ scanner.setRange(Range.prefix(getForwardIndex(firstPart)));
+ } else {
+ System.out.println("executing middle wildcard search for " + regexString + " from entries ending with " + lastPart);
+ scanner.setRange(Range.prefix(getReverseIndex(lastPart)));
+ }
+ IteratorSetting regex = new IteratorSetting(50, "regex", RegExFilter.class);
+ RegExFilter.setRegexs(regex, null, null, regexString, null, false);
+ scanner.addScanIterator(regex);
+ return scanner;
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 7 && (args.length != 8 || !args[7].equals("-search"))) {
+ System.out.println("usage: " + QueryUtil.class.getSimpleName() + " <instance> <zookeepers> <user> <pass> <table> <auths> <path> [-search]");
+ System.exit(1);
+ }
+ QueryUtil q = new QueryUtil(args[0], args[1], args[2], args[3], args[4], new Authorizations(args[5].split(",")));
+ if (args.length == 8) {
+ for (Entry<Key,Value> e : q.singleWildCardSearch(args[6])) {
+ System.out.println(e.getKey().getColumnQualifier());
+ }
+ }
+ for (Entry<String,Map<String,String>> e : q.getDirList(args[6]).entrySet()) {
+ System.out.println(e);
+ }
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/QueryUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Viewer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Viewer.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Viewer.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Viewer.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.awt.BorderLayout;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.swing.JFrame;
+import javax.swing.JScrollPane;
+import javax.swing.JSplitPane;
+import javax.swing.JTextArea;
+import javax.swing.JTree;
+import javax.swing.event.TreeExpansionEvent;
+import javax.swing.event.TreeExpansionListener;
+import javax.swing.event.TreeSelectionEvent;
+import javax.swing.event.TreeSelectionListener;
+import javax.swing.tree.DefaultMutableTreeNode;
+import javax.swing.tree.DefaultTreeModel;
+import javax.swing.tree.TreePath;
+
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.simple.filedata.FileDataQuery;
+import org.apache.log4j.Logger;
+
+@SuppressWarnings("serial")
+public class Viewer extends JFrame implements TreeSelectionListener, TreeExpansionListener {
+ private static final Logger log = Logger.getLogger(Viewer.class);
+
+ JTree tree;
+ DefaultTreeModel treeModel;
+ QueryUtil q;
+ FileDataQuery fdq;
+ String topPath;
+ Map<String,DefaultMutableTreeNode> nodeNameMap;
+ JTextArea text;
+ JTextArea data;
+ JScrollPane dataPane;
+
+ public static class NodeInfo {
+ private String name;
+ private Map<String,String> data;
+
+ public NodeInfo(String name, Map<String,String> data) {
+ this.name = name;
+ this.data = data;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFullName() {
+ String fn = data.get("fullname");
+ if (fn == null)
+ return name;
+ return fn;
+ }
+
+ public Map<String,String> getData() {
+ return data;
+ }
+
+ public String toString() {
+ return getName();
+ }
+
+ public String getHash() {
+ for (String k : data.keySet()) {
+ String[] parts = k.split(":");
+ if (parts.length >= 2 && parts[1].equals("md5")) {
+ return data.get(k);
+ }
+ }
+ return null;
+ }
+ }
+
+ public Viewer(String instanceName, String zooKeepers, String user, String password, String tableName, String dataTableName, Authorizations auths, String path)
+ throws Exception {
+ super("File Viewer");
+ setSize(1000, 800);
+ setDefaultCloseOperation(EXIT_ON_CLOSE);
+ q = new QueryUtil(instanceName, zooKeepers, user, password, tableName, auths);
+ fdq = new FileDataQuery(instanceName, zooKeepers, user, password, dataTableName, auths);
+ this.topPath = path;
+ }
+
+ public void populate(DefaultMutableTreeNode node) throws TableNotFoundException {
+ String path = ((NodeInfo) node.getUserObject()).getFullName();
+ log.debug("listing " + path);
+ for (Entry<String,Map<String,String>> e : q.getDirList(path).entrySet()) {
+ log.debug("got child for " + node.getUserObject() + ": " + e.getKey());
+ node.add(new DefaultMutableTreeNode(new NodeInfo(e.getKey(), e.getValue())));
+ }
+ }
+
+ public void populateChildren(DefaultMutableTreeNode node) throws TableNotFoundException {
+ @SuppressWarnings("unchecked")
+ Enumeration<DefaultMutableTreeNode> children = node.children();
+ while (children.hasMoreElements()) {
+ populate(children.nextElement());
+ }
+ }
+
+ public void init() throws TableNotFoundException {
+ DefaultMutableTreeNode root = new DefaultMutableTreeNode(new NodeInfo(topPath, q.getData(topPath)));
+ populate(root);
+ populateChildren(root);
+
+ treeModel = new DefaultTreeModel(root);
+ tree = new JTree(treeModel);
+ tree.addTreeExpansionListener(this);
+ tree.addTreeSelectionListener(this);
+ text = new JTextArea(getText(q.getData(topPath)));
+ data = new JTextArea("");
+ JScrollPane treePane = new JScrollPane(tree);
+ JScrollPane textPane = new JScrollPane(text);
+ dataPane = new JScrollPane(data);
+ JSplitPane infoSplitPane = new JSplitPane(JSplitPane.VERTICAL_SPLIT, textPane, dataPane);
+ JSplitPane mainSplitPane = new JSplitPane(JSplitPane.HORIZONTAL_SPLIT, treePane, infoSplitPane);
+ mainSplitPane.setDividerLocation(300);
+ infoSplitPane.setDividerLocation(150);
+ getContentPane().add(mainSplitPane, BorderLayout.CENTER);
+ }
+
+ public static String getText(DefaultMutableTreeNode node) {
+ return getText(((NodeInfo) node.getUserObject()).getData());
+ }
+
+ public static String getText(Map<String,String> data) {
+ StringBuilder sb = new StringBuilder();
+ for (String name : data.keySet()) {
+ sb.append(name);
+ sb.append(" : ");
+ sb.append(data.get(name));
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void treeExpanded(TreeExpansionEvent event) {
+ try {
+ populateChildren((DefaultMutableTreeNode) event.getPath().getLastPathComponent());
+ } catch (TableNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void treeCollapsed(TreeExpansionEvent event) {
+ DefaultMutableTreeNode node = (DefaultMutableTreeNode) event.getPath().getLastPathComponent();
+ @SuppressWarnings("unchecked")
+ Enumeration<DefaultMutableTreeNode> children = node.children();
+ while (children.hasMoreElements()) {
+ DefaultMutableTreeNode child = children.nextElement();
+ log.debug("removing children of " + ((NodeInfo) child.getUserObject()).getFullName());
+ child.removeAllChildren();
+ }
+ }
+
+ @Override
+ public void valueChanged(TreeSelectionEvent e) {
+ TreePath selected = e.getNewLeadSelectionPath();
+ if (selected == null)
+ return;
+ DefaultMutableTreeNode node = (DefaultMutableTreeNode) selected.getLastPathComponent();
+ text.setText(getText(node));
+ try {
+ String hash = ((NodeInfo) node.getUserObject()).getHash();
+ if (hash != null) {
+ data.setText(fdq.getSomeData(hash, 10000));
+ } else {
+ data.setText("");
+ }
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 7 && args.length != 8) {
+ System.out.println("usage: " + Viewer.class.getSimpleName() + " <instance> <zoo> <user> <pass> <table> <datatable> <auths> [rootpath]");
+ System.exit(1);
+ }
+ String rootpath = "/";
+ if (args.length == 8)
+ rootpath = args[7];
+ Viewer v = new Viewer(args[0], args[1], args[2], args[3], args[4], args[5], new Authorizations(args[6].split(",")), rootpath);
+ v.init();
+ v.setVisible(true);
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Viewer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkCombiner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkCombiner.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkCombiner.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkCombiner.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/*
+ * This iterator dedupes chunks and sets their visibilities to the combined
+ * visibility of the refs columns. For example, it would combine
+ *
+ * row1 refs uid1\0a A&B V0
+ * row1 refs uid2\0b C&D V0
+ * row1 ~chunk 0 A&B V1
+ * row1 ~chunk 0 C&D V1
+ * row1 ~chunk 0 E&F V1
+ * row1 ~chunk 0 G&H V1
+ *
+ * into the following...
+ *
+ * row1 refs uid1\0a A&B V0
+ * row1 refs uid2\0b C&D V0
+ * row1 ~chunk 0 (A&B)|(C&D) V1
+ *
+ */
+
+public class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+
+ private SortedKeyValueIterator<Key,Value> source;
+ private SortedKeyValueIterator<Key,Value> refsSource;
+ private static final Collection<ByteSequence> refsColf = Collections.singleton(FileDataIngest.REFS_CF_BS);
+ private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+ private Key topKey = null;
+ private Value topValue = null;
+
+ public ChunkCombiner() {}
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ this.source = source;
+ this.refsSource = source.deepCopy(env);
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ findTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ source.seek(range, columnFamilies, inclusive);
+ findTop();
+ }
+
+ private void findTop() throws IOException {
+ do {
+ topKey = null;
+ topValue = null;
+ } while (source.hasTop() && _findTop() == null);
+ }
+
+ private byte[] _findTop() throws IOException {
+ long maxTS;
+
+ topKey = new Key(source.getTopKey());
+ topValue = new Value(source.getTopValue());
+ source.next();
+
+ if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+ return topKey.getColumnVisibility().getBytes();
+
+ maxTS = topKey.getTimestamp();
+
+ while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+ if (source.getTopKey().getTimestamp() > maxTS)
+ maxTS = source.getTopKey().getTimestamp();
+
+ if (!topValue.equals(source.getTopValue()))
+ throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey() + " : " + diffInfo(topValue, source.getTopValue()));
+
+ source.next();
+ }
+
+ byte[] vis = getVisFromRefs();
+ if (vis != null) {
+ topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(), topKey.getColumnQualifierData().toArray(), vis, maxTS);
+ }
+ return vis;
+ }
+
+ private byte[] getVisFromRefs() throws IOException {
+ Text row = topKey.getRow();
+ if (lastRowVC.containsKey(row))
+ return lastRowVC.get(row);
+ Range range = new Range(row);
+ refsSource.seek(range, refsColf, true);
+ VisibilityCombiner vc = null;
+ while (refsSource.hasTop()) {
+ if (vc == null)
+ vc = new VisibilityCombiner();
+ vc.add(refsSource.getTopKey().getColumnVisibilityData());
+ refsSource.next();
+ }
+ if (vc == null) {
+ lastRowVC = Collections.singletonMap(row, null);
+ return null;
+ }
+ lastRowVC = Collections.singletonMap(row, vc.get());
+ return vc.get();
+ }
+
+ private String diffInfo(Value v1, Value v2) {
+ if (v1.getSize() != v2.getSize()) {
+ return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+ }
+
+ byte[] vb1 = v1.get();
+ byte[] vb2 = v2.get();
+
+ for (int i = 0; i < vb1.length; i++) {
+ if (vb1[i] != vb2[i]) {
+ return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i], 0xff & vb2[i]);
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ ChunkCombiner cc = new ChunkCombiner();
+ try {
+ cc.init(source.deepCopy(env), null, env);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return cc;
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormat.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormat.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormat.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class ChunkInputFormat extends InputFormatBase<List<Entry<Key,Value>>,InputStream> {
+ @Override
+ public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new RecordReaderBase<List<Entry<Key,Value>>,InputStream>() {
+ private PeekingIterator<Entry<Key,Value>> peekingScannerIterator;
+
+ @Override
+ public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
+ super.initialize(inSplit, attempt);
+ peekingScannerIterator = new PeekingIterator<Entry<Key,Value>>(scannerIterator);
+ currentK = new ArrayList<Entry<Key,Value>>();
+ currentV = new ChunkInputStream();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ currentK.clear();
+ if (peekingScannerIterator.hasNext()) {
+ ++numKeysRead;
+ Entry<Key,Value> entry = peekingScannerIterator.peek();
+ while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+ currentK.add(entry);
+ peekingScannerIterator.next();
+ if (!peekingScannerIterator.hasNext())
+ return true;
+ entry = peekingScannerIterator.peek();
+ }
+ currentKey = entry.getKey();
+ ((ChunkInputStream) currentV).setSource(peekingScannerIterator);
+ if (log.isTraceEnabled())
+ log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true));
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStream.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStream.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStream.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStream.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class ChunkInputStream extends InputStream {
+ private static final Logger log = Logger.getLogger(ChunkInputStream.class);
+
+ protected PeekingIterator<Entry<Key,Value>> source;
+ protected Key currentKey;
+ protected Set<Text> currentVis;
+ protected int currentChunk;
+ protected int currentChunkSize;
+ protected boolean gotEndMarker;
+
+ protected byte buf[];
+ protected int count;
+ protected int pos;
+
+ public ChunkInputStream() {
+ source = null;
+ }
+
+ public ChunkInputStream(PeekingIterator<Entry<Key,Value>> in) {
+ setSource(in);
+ }
+
+ public void setSource(PeekingIterator<Entry<Key,Value>> in) {
+ if (source != null)
+ throw new RuntimeException("setting new source without closing old one");
+ this.source = in;
+ currentVis = new TreeSet<Text>();
+ count = pos = 0;
+ if (!source.hasNext()) {
+ log.debug("source has no next");
+ gotEndMarker = true;
+ return;
+ }
+
+ // read forward until we reach a chunk
+ Entry<Key,Value> entry = source.next();
+ currentKey = entry.getKey();
+ buf = entry.getValue().get();
+ while (!currentKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+ log.debug("skipping key: " + currentKey.toString());
+ if (!source.hasNext())
+ return;
+ entry = source.next();
+ currentKey = entry.getKey();
+ buf = entry.getValue().get();
+ }
+ log.debug("starting chunk: " + currentKey.toString());
+ count = buf.length;
+ currentVis.add(currentKey.getColumnVisibility());
+ currentChunk = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 4);
+ currentChunkSize = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 0);
+ gotEndMarker = false;
+ }
+
+ private int fill() throws IOException {
+ if (source == null || !source.hasNext()) {
+ if (gotEndMarker)
+ return count = pos = 0;
+ else
+ throw new IOException("no end chunk marker but source has no data");
+ }
+
+ Entry<Key,Value> entry = source.peek();
+ Key thisKey = entry.getKey();
+ log.debug("evaluating key: " + thisKey.toString());
+
+ // check that we're still on the same row
+ if (!thisKey.equals(currentKey, PartialKey.ROW)) {
+ if (gotEndMarker)
+ return -1;
+ else {
+ String currentRow = currentKey.getRow().toString();
+ clear();
+ throw new IOException("got to the end of the row without end chunk marker " + currentRow);
+ }
+ }
+ log.debug("matches current key");
+
+ // ok to advance the iterator
+ source.next();
+
+ // check that this is part of a chunk
+ if (!thisKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+ log.debug("skipping non-chunk key");
+ return fill();
+ }
+ log.debug("is a chunk");
+
+ // check that the chunk size is the same as the one being read
+ if (currentChunkSize != FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 0)) {
+ log.debug("skipping chunk of different size");
+ return fill();
+ }
+
+ // add the visibility to the list if it's not there
+ if (!currentVis.contains(thisKey.getColumnVisibility()))
+ currentVis.add(thisKey.getColumnVisibility());
+
+ // check to see if it is an identical chunk with a different visibility
+ if (thisKey.getColumnQualifier().equals(currentKey.getColumnQualifier())) {
+ log.debug("skipping identical chunk with different visibility");
+ return fill();
+ }
+
+ if (gotEndMarker) {
+ log.debug("got another chunk after end marker: " + currentKey.toString() + " " + thisKey.toString());
+ clear();
+ throw new IOException("found extra chunk after end marker");
+ }
+
+ // got new chunk of the same file, check that it's the next chunk
+ int thisChunk = FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 4);
+ if (thisChunk != currentChunk + 1) {
+ log.debug("new chunk same file, unexpected chunkID: " + currentKey.toString() + " " + thisKey.toString());
+ clear();
+ throw new IOException("missing chunks between " + currentChunk + " and " + thisChunk);
+ }
+
+ currentKey = thisKey;
+ currentChunk = thisChunk;
+ buf = entry.getValue().get();
+ pos = 0;
+
+ // check to see if it's the last chunk
+ if (buf.length == 0) {
+ gotEndMarker = true;
+ return fill();
+ }
+
+ return count = buf.length;
+ }
+
+ public Set<Text> getVisibilities() {
+ if (source != null)
+ throw new RuntimeException("don't get visibilities before chunks have been completely read");
+ return currentVis;
+ }
+
+ public int read() throws IOException {
+ if (source == null)
+ return -1;
+ log.debug("pos: " + pos + " count: " + count);
+ if (pos >= count) {
+ if (fill() <= 0) {
+ log.debug("done reading input stream at key: " + (currentKey == null ? "null" : currentKey.toString()));
+ if (source != null && source.hasNext())
+ log.debug("next key: " + source.peek().getKey());
+ clear();
+ return -1;
+ }
+ }
+ return buf[pos++] & 0xff;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ log.debug("filling buffer " + off + " " + len);
+ int total = 0;
+ while (total < len) {
+ int avail = count - pos;
+ log.debug(avail + " available in current local buffer");
+ if (avail <= 0) {
+ if (fill() <= 0) {
+ log.debug("done reading input stream at key: " + (currentKey == null ? "null" : currentKey.toString()));
+ if (source != null && source.hasNext())
+ log.debug("next key: " + source.peek().getKey());
+ clear();
+ log.debug("filled " + total + " bytes");
+ return total == 0 ? -1 : total;
+ }
+ avail = count - pos;
+ }
+
+ int cnt = (avail < len - total) ? avail : len - total;
+ log.debug("copying from local buffer: local pos " + pos + " into pos " + off + " len " + cnt);
+ System.arraycopy(buf, pos, b, off, cnt);
+ pos += cnt;
+ off += cnt;
+ total += cnt;
+ }
+ log.debug("filled " + total + " bytes");
+ return total;
+ }
+
+ public void clear() {
+ source = null;
+ buf = null;
+ currentKey = null;
+ currentChunk = 0;
+ pos = count = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ while (fill() > 0) {}
+ } catch (IOException e) {
+ clear();
+ throw new IOException(e);
+ }
+ clear();
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class FileDataIngest {
+ public static final Text CHUNK_CF = new Text("~chunk");
+ public static final Text REFS_CF = new Text("refs");
+ public static final String REFS_ORIG_FILE = "name";
+ public static final String REFS_FILE_EXT = "filext";
+ public static final ByteSequence CHUNK_CF_BS = new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+ public static final ByteSequence REFS_CF_BS = new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+ int chunkSize;
+ byte[] chunkSizeBytes;
+ byte[] buf;
+ MessageDigest md5digest;
+ ColumnVisibility cv;
+
+ public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+ this.chunkSize = chunkSize;
+ chunkSizeBytes = intToBytes(chunkSize);
+ buf = new byte[chunkSize];
+ try {
+ md5digest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ cv = colvis;
+ }
+
+ public String insertFileData(String filename, BatchWriter bw) throws MutationsRejectedException, IOException {
+ if (chunkSize == 0)
+ return "";
+ md5digest.reset();
+ String uid = hexString(md5digest.digest(filename.getBytes()));
+
+ // read through file once, calculating hashes
+ md5digest.reset();
+ InputStream fis = new FileInputStream(filename);
+ int numRead = fis.read(buf);
+ while (numRead >= 0) {
+ if (numRead > 0) {
+ md5digest.update(buf, 0, numRead);
+ }
+ numRead = fis.read(buf);
+ }
+ fis.close();
+
+ String hash = hexString(md5digest.digest());
+ Text row = new Text(hash);
+
+ // write info to accumulo
+ Mutation m = new Mutation(row);
+ m.put(REFS_CF, KeyUtil.buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+ String fext = getExt(filename);
+ if (fext != null)
+ m.put(REFS_CF, KeyUtil.buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+ bw.addMutation(m);
+
+ // read through file again, writing chunks to accumulo
+ fis = new FileInputStream(filename);
+ numRead = fis.read(buf);
+ int chunkCount = 0;
+ while (numRead >= 0) {
+ while (numRead < buf.length) {
+ int moreRead = fis.read(buf, numRead, buf.length - numRead);
+ if (moreRead > 0)
+ numRead += moreRead;
+ else if (moreRead < 0)
+ break;
+ }
+ m = new Mutation(row);
+ Text chunkCQ = new Text(chunkSizeBytes);
+ chunkCQ.append(intToBytes(chunkCount), 0, 4);
+ m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+ bw.addMutation(m);
+ if (chunkCount == Integer.MAX_VALUE)
+ throw new RuntimeException("too many chunks for file " + filename + ", try raising chunk size");
+ chunkCount++;
+ numRead = fis.read(buf);
+ }
+ fis.close();
+ m = new Mutation(row);
+ Text chunkCQ = new Text(chunkSizeBytes);
+ chunkCQ.append(intToBytes(chunkCount), 0, 4);
+ m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+ bw.addMutation(m);
+ return hash;
+ }
+
+ public static int bytesToInt(byte[] b, int offset) {
+ if (b.length <= offset + 3)
+ throw new NumberFormatException("couldn't pull integer from bytes at offset " + offset);
+ int i = (((b[offset] & 255) << 24) + ((b[offset + 1] & 255) << 16) + ((b[offset + 2] & 255) << 8) + ((b[offset + 3] & 255) << 0));
+ return i;
+ }
+
+ public static byte[] intToBytes(int l) {
+ byte[] b = new byte[4];
+ b[0] = (byte) (l >>> 24);
+ b[1] = (byte) (l >>> 16);
+ b[2] = (byte) (l >>> 8);
+ b[3] = (byte) (l >>> 0);
+ return b;
+ }
+
+ private static String getExt(String filename) {
+ if (filename.indexOf(".") == -1)
+ return null;
+ return filename.substring(filename.lastIndexOf(".") + 1);
+ }
+
+ public String hexString(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 8) {
+ System.out.println("usage: " + FileDataIngest.class.getSimpleName()
+ + " <instance> <zoo> <user> <pass> <data table> <visibility> <data chunk size> <file>{ <file>}");
+ System.exit(1);
+ }
+
+ String instance = args[0];
+ String zooKeepers = args[1];
+ String user = args[2];
+ String pass = args[3];
+ String dataTable = args[4];
+ ColumnVisibility colvis = new ColumnVisibility(args[5]);
+ int chunkSize = Integer.parseInt(args[6]);
+
+ Connector conn = new ZooKeeperInstance(instance, zooKeepers).getConnector(user, pass.getBytes());
+ if (!conn.tableOperations().exists(dataTable))
+ conn.tableOperations().create(dataTable);
+ BatchWriter bw = conn.createBatchWriter(dataTable, 50000000, 300000l, 4);
+ FileDataIngest fdi = new FileDataIngest(chunkSize, colvis);
+ for (int i = 7; i < args.length; i++) {
+ fdi.insertFileData(args[i], bw);
+ }
+ bw.close();
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataQuery.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataQuery.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataQuery.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataQuery.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.PeekingIterator;
+
+public class FileDataQuery {
+ private Connector conn = null;
+ List<Entry<Key,Value>> lastRefs;
+ private ChunkInputStream cis;
+ Scanner scanner;
+
+ public FileDataQuery(String instanceName, String zooKeepers, String user, String password, String tableName, Authorizations auths) throws AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+ ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
+ conn = instance.getConnector(user, password.getBytes());
+ lastRefs = new ArrayList<Entry<Key,Value>>();
+ cis = new ChunkInputStream();
+ scanner = conn.createScanner(tableName, auths);
+ }
+
+ public List<Entry<Key,Value>> getLastRefs() {
+ return lastRefs;
+ }
+
+ public ChunkInputStream getData(String hash) {
+ scanner.setRange(new Range(hash));
+ scanner.setBatchSize(1);
+ lastRefs.clear();
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(scanner.iterator());
+ if (pi.hasNext()) {
+ while (!pi.peek().getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+ lastRefs.add(pi.peek());
+ pi.next();
+ }
+ }
+ cis.clear();
+ cis.setSource(pi);
+ return cis;
+ }
+
+ public String getSomeData(String hash, int numBytes) throws IOException {
+ ChunkInputStream is = getData(hash);
+ byte[] buf = new byte[numBytes];
+ if (is.read(buf) >= 0) {
+ return new String(buf);
+ } else {
+ return "";
+ }
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataQuery.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/KeyUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/KeyUtil.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/KeyUtil.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/KeyUtil.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.io.Text;
+
+public class KeyUtil {
+ public static final byte[] nullbyte = new byte[] {0};
+
+ public static Text buildNullSepText(String... s) {
+ Text t = new Text(s[0]);
+ for (int i = 1; i < s.length; i++) {
+ t.append(nullbyte, 0, 1);
+ t.append(s[i].getBytes(), 0, s[i].length());
+ }
+ return t;
+ }
+
+ public static String[] splitNullSepText(Text t) {
+ ArrayList<String> s = new ArrayList<String>();
+ byte[] b = t.getBytes();
+ int lastindex = 0;
+ for (int i = 0; i < t.getLength(); i++) {
+ if (b[i] == (byte) 0) {
+ s.add(new String(b, lastindex, i - lastindex));
+ lastindex = i + 1;
+ }
+ }
+ s.add(new String(b, lastindex, t.getLength() - lastindex));
+ return s.toArray(new String[s.size()]);
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/KeyUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/VisibilityCombiner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/VisibilityCombiner.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/VisibilityCombiner.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/VisibilityCombiner.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.ByteSequence;
+
+public class VisibilityCombiner {
+
+ private TreeSet<String> visibilities = new TreeSet<String>();
+
+ void add(ByteSequence cv) {
+ if (cv.length() == 0)
+ return;
+
+ int depth = 0;
+ int offset = 0;
+
+ for (int i = 0; i < cv.length(); i++) {
+ switch (cv.byteAt(i)) {
+ case '(':
+ depth++;
+ break;
+ case ')':
+ depth--;
+ if (depth < 0)
+ throw new IllegalArgumentException("Invalid vis " + cv);
+ break;
+ case '|':
+ if (depth == 0) {
+ insert(cv.subSequence(offset, i));
+ offset = i + 1;
+ }
+
+ break;
+ }
+ }
+
+ insert(cv.subSequence(offset, cv.length()));
+
+ if (depth != 0)
+ throw new IllegalArgumentException("Invalid vis " + cv);
+
+ }
+
+ private void insert(ByteSequence cv) {
+ for (int i = 0; i < cv.length(); i++) {
+
+ }
+
+ String cvs = cv.toString();
+
+ if (cvs.charAt(0) != '(')
+ cvs = "(" + cvs + ")";
+ else {
+ int depth = 0;
+ int depthZeroCloses = 0;
+ for (int i = 0; i < cv.length(); i++) {
+ switch (cv.byteAt(i)) {
+ case '(':
+ depth++;
+ break;
+ case ')':
+ depth--;
+ if (depth == 0)
+ depthZeroCloses++;
+ break;
+ }
+ }
+
+ if (depthZeroCloses > 1)
+ cvs = "(" + cvs + ")";
+ }
+
+ visibilities.add(cvs);
+ }
+
+ byte[] get() {
+ StringBuilder sb = new StringBuilder();
+ String sep = "";
+ for (String cvs : visibilities) {
+ sb.append(sep);
+ sep = "|";
+ sb.append(cvs);
+ }
+
+ return sb.toString().getBytes();
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/VisibilityCombiner.java
------------------------------------------------------------------------------
svn:eol-style = native