You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/04/29 18:25:10 UTC
[4/9] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e8f98e74
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e8f98e74
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e8f98e74
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: e8f98e74a57735be4b6eeb488ac19aaca6bc6fb6
Parents: f5bcee7 ff8c238
Author: Josh Elser <el...@apache.org>
Authored: Tue Apr 29 11:47:41 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Apr 29 12:08:43 2014 -0400
----------------------------------------------------------------------
.../client/mock/MockTableOperationsImpl.java | 4 +-
.../client/mock/MockTableOperationsTest.java | 57 ++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e8f98e74/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java
index fea9568,0000000..8a8895f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java
@@@ -1,447 -1,0 +1,447 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mock;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.FindMax;
+import org.apache.accumulo.core.client.impl.TableOperationsHelper;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+class MockTableOperationsImpl extends TableOperationsHelper {
+ private static final byte[] ZERO = {0};
+ private final MockAccumulo acu;
+ private final String username;
+
+ MockTableOperationsImpl(MockAccumulo acu, String username) {
+ this.acu = acu;
+ this.username = username;
+ }
+
+ @Override
+ public SortedSet<String> list() {
+ return new TreeSet<String>(acu.tables.keySet());
+ }
+
+ @Override
+ public boolean exists(String tableName) {
+ return acu.tables.containsKey(tableName);
+ }
+
+ private boolean namespaceExists(String namespace) {
+ return acu.namespaces.containsKey(namespace);
+ }
+
+ @Override
+ public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ create(tableName, true, TimeType.MILLIS);
+ }
+
+ @Override
+ public void create(String tableName, boolean versioningIter) throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ create(tableName, versioningIter, TimeType.MILLIS);
+ }
+
+ @Override
+ public void create(String tableName, boolean versioningIter, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ String namespace = Tables.qualify(tableName).getFirst();
+ if (!tableName.matches(Tables.VALID_NAME_REGEX)) {
+ throw new IllegalArgumentException();
+ }
+ if (exists(tableName))
+ throw new TableExistsException(tableName, tableName, "");
+
+ if (!namespaceExists(namespace)) {
+ throw new IllegalArgumentException("Namespace (" + namespace + ") does not exist, create it first");
+ }
+ acu.createTable(username, tableName, versioningIter, timeType);
+ }
+
+ @Override
+ public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ acu.addSplits(tableName, partitionKeys);
+ }
+
+ @Deprecated
+ @Override
+ public Collection<Text> getSplits(String tableName) throws TableNotFoundException {
+ return listSplits(tableName);
+ }
+
+ @Deprecated
+ @Override
+ public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException {
+ return listSplits(tableName);
+ }
+
+ @Override
+ public Collection<Text> listSplits(String tableName) throws TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ return acu.getSplits(tableName);
+ }
+
+ @Override
+ public Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException {
+ return listSplits(tableName);
+ }
+
+ @Override
+ public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ acu.tables.remove(tableName);
+ }
+
+ @Override
+ public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
+ TableExistsException {
+ if (!exists(oldTableName))
+ throw new TableNotFoundException(oldTableName, oldTableName, "");
+ if (exists(newTableName))
+ throw new TableExistsException(newTableName, newTableName, "");
+ MockTable t = acu.tables.remove(oldTableName);
+ String namespace = Tables.qualify(newTableName).getFirst();
+ MockNamespace n = acu.namespaces.get(namespace);
+ if (n == null) {
+ n = new MockNamespace();
+ }
+ t.setNamespaceName(namespace);
+ t.setNamespace(n);
+ acu.namespaces.put(namespace, n);
+ acu.tables.put(newTableName, t);
+ }
+
+ @Deprecated
+ @Override
+ public void flush(String tableName) throws AccumuloException, AccumuloSecurityException {}
+
+ @Override
+ public void setProperty(String tableName, String property, String value) throws AccumuloException, AccumuloSecurityException {
+ acu.tables.get(tableName).settings.put(property, value);
+ }
+
+ @Override
+ public void removeProperty(String tableName, String property) throws AccumuloException, AccumuloSecurityException {
+ acu.tables.get(tableName).settings.remove(property);
+ }
+
+ @Override
+ public Iterable<Entry<String,String>> getProperties(String tableName) throws TableNotFoundException {
+ String namespace = Tables.qualify(tableName).getFirst();
+ if (!exists(tableName)) {
+ if (!namespaceExists(namespace))
+ throw new TableNotFoundException(tableName, new NamespaceNotFoundException(null, namespace, null));
+ throw new TableNotFoundException(null, tableName, null);
+ }
+
+ Set<Entry<String,String>> props = new HashSet<Entry<String,String>>(acu.namespaces.get(namespace).settings.entrySet());
+
+ Set<Entry<String,String>> tableProps = acu.tables.get(tableName).settings.entrySet();
+ for (Entry<String,String> e : tableProps) {
+ if (props.contains(e)) {
+ props.remove(e);
+ }
+ props.add(e);
+ }
+ return props;
+ }
+
+ @Override
+ public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ acu.tables.get(tableName).setLocalityGroups(groups);
+ }
+
+ @Override
+ public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ return acu.tables.get(tableName).getLocalityGroups();
+ }
+
+ @Override
+ public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ return Collections.singleton(range);
+ }
+
+ @Override
+ public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+ long time = System.currentTimeMillis();
+ MockTable table = acu.tables.get(tableName);
+ if (table == null) {
+ throw new TableNotFoundException(null, tableName, "The table was not found");
+ }
+ Path importPath = new Path(dir);
+ Path failurePath = new Path(failureDir);
+
+ FileSystem fs = acu.getFileSystem();
+ /*
+ * check preconditions
+ */
+ // directories are directories
+ if (fs.isFile(importPath)) {
+ throw new IOException("Import path must be a directory.");
+ }
+ if (fs.isFile(failurePath)) {
+ throw new IOException("Failure path must be a directory.");
+ }
+ // failures are writable
+ Path createPath = failurePath.suffix("/.createFile");
+ FSDataOutputStream createStream = null;
+ try {
+ createStream = fs.create(createPath);
+ } catch (IOException e) {
+ throw new IOException("Error path is not writable.");
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ fs.delete(createPath, false);
+ // failures are empty
+ FileStatus[] failureChildStats = fs.listStatus(failurePath);
+ if (failureChildStats.length > 0) {
+ throw new IOException("Error path must be empty.");
+ }
+ /*
+ * Begin the import - iterate the files in the path
+ */
+ for (FileStatus importStatus : fs.listStatus(importPath)) {
+ try {
+ FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(),
+ AccumuloConfiguration.getDefaultConfiguration());
+ while (importIterator.hasTop()) {
+ Key key = importIterator.getTopKey();
+ Value value = importIterator.getTopValue();
+ if (setTime) {
+ key.setTimestamp(time);
+ }
+ Mutation mutation = new Mutation(key.getRow());
+ if (!key.isDeleted()) {
+ mutation.put(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibilityData().toArray()), key.getTimestamp(),
+ value);
+ } else {
+ mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibilityData().toArray()),
+ key.getTimestamp());
+ }
+ table.addMutation(mutation);
+ importIterator.next();
+ }
+ } catch (Exception e) {
+ FSDataOutputStream failureWriter = null;
+ DataInputStream failureReader = null;
+ try {
+ failureWriter = fs.create(failurePath.suffix("/" + importStatus.getPath().getName()));
+ failureReader = fs.open(importStatus.getPath());
+ int read = 0;
+ byte[] buffer = new byte[1024];
+ while (-1 != (read = failureReader.read(buffer))) {
+ failureWriter.write(buffer, 0, read);
+ }
+ } finally {
+ if (failureReader != null)
+ failureReader.close();
+ if (failureWriter != null)
+ failureWriter.close();
+ }
+ }
+ fs.delete(importStatus.getPath(), true);
+ }
+ }
+
+ @Override
+ public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ offline(tableName, false);
+ }
+
+ @Override
+ public void offline(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ online(tableName, false);
+ }
+
+ @Override
+ public void online(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public void clearLocatorCache(String tableName) throws TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public Map<String,String> tableIdMap() {
+ Map<String,String> result = new HashMap<String,String>();
+ for (String table : acu.tables.keySet()) {
+ if (RootTable.NAME.equals(table))
+ result.put(table, RootTable.ID);
+ else if (MetadataTable.NAME.equals(table))
+ result.put(table, MetadataTable.ID);
+ else
+ result.put(table, table);
+ }
+ return result;
+ }
+
+ @Override
+ public List<DiskUsage> getDiskUsage(Set<String> tables) throws AccumuloException, AccumuloSecurityException {
+
+ List<DiskUsage> diskUsages = new ArrayList<DiskUsage>();
+ diskUsages.add(new DiskUsage(new TreeSet<String>(tables), 0l));
+
+ return diskUsages;
+ }
+
+ @Override
+ public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ acu.merge(tableName, start, end);
+ }
+
+ @Override
+ public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ MockTable t = acu.tables.get(tableName);
- Text startText = new Text(start);
- Text endText = new Text(end);
++ Text startText = start != null ? new Text(start) : new Text();
++ Text endText = end != null ? new Text(end) : new Text(t.table.lastKey().getRow().getBytes());
+ startText.append(ZERO, 0, 1);
+ endText.append(ZERO, 0, 1);
+ Set<Key> keep = new TreeSet<Key>(t.table.subMap(new Key(startText), new Key(endText)).keySet());
+ t.table.keySet().removeAll(keep);
+ }
+
+ @Override
+ public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException,
+ AccumuloException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
+ TableNotFoundException, AccumuloException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ if (!exists(tableName))
+ throw new TableNotFoundException(tableName, tableName, "");
+ }
+
+ @Override
+ public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ MockTable table = acu.tables.get(tableName);
+ if (table == null)
+ throw new TableNotFoundException(tableName, tableName, "no such table");
+
+ return FindMax.findMax(new MockScanner(table, auths), startRow, startInclusive, endRow, endInclusive);
+ }
+
+ @Override
+ public void importTable(String tableName, String exportDir) throws TableExistsException, AccumuloException, AccumuloSecurityException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean testClassLoad(String tableName, String className, String asTypeName) throws AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+
+ try {
+ AccumuloVFSClassLoader.loadClass(className, Class.forName(asTypeName));
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e8f98e74/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
index 94dbed1,ea916e7..138ab93
--- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
@@@ -276,4 -276,61 +276,61 @@@ public class MockTableOperationsTest
Assert.assertEquals(5, oneCnt);
}
+ @Test
+ public void testDeleteRowsWithNullKeys() throws Exception {
+ Instance instance = new MockInstance("rows");
+ Connector connector = instance.getConnector("user", new PasswordToken("foo"));
+ TableOperations to = connector.tableOperations();
+ to.create("test2");
+ BatchWriter bw = connector.createBatchWriter("test2", new BatchWriterConfig());
+ for (int r = 0; r < 30; r++) {
+ Mutation m = new Mutation(Integer.toString(r));
+ for (int c = 0; c < 5; c++) {
+ m.put(new Text("cf"), new Text(Integer.toString(c)), new Value(Integer.toString(c).getBytes()));
+ }
+ bw.addMutation(m);
+ }
+ bw.flush();
+
+ // test null end
+ // will remove rows 4 through 9 (6 * 5 = 30 entries)
+ to.deleteRows("test2", new Text("30"), null);
- Scanner s = connector.createScanner("test2", Constants.NO_AUTHS);
++ Scanner s = connector.createScanner("test2", Authorizations.EMPTY);
+ int rowCnt = 0;
+ for (Entry<Key,Value> entry : s) {
+ String rowId = entry.getKey().getRow().toString();
+ Assert.assertFalse(rowId.startsWith("30"));
+ rowCnt++;
+ }
+ s.close();
+ Assert.assertEquals(120, rowCnt);
+
+ // test null start
+ // will remove 0-1, 10-19, 2
+ to.deleteRows("test2", null, new Text("2"));
- s = connector.createScanner("test2", Constants.NO_AUTHS);
++ s = connector.createScanner("test2", Authorizations.EMPTY);
+ rowCnt = 0;
+ for (Entry<Key,Value> entry : s) {
+ char rowStart = entry.getKey().getRow().toString().charAt(0);
+ Assert.assertTrue(rowStart >= '2');
+ rowCnt++;
+ }
+ s.close();
+ Assert.assertEquals(55, rowCnt);
+
+ // test null start and end
+ // deletes everything still left
+ to.deleteRows("test2", null, null);
- s = connector.createScanner("test2", Constants.NO_AUTHS);
++ s = connector.createScanner("test2", Authorizations.EMPTY);
+ rowCnt = 0;
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {
+ rowCnt++;
+ }
+ s.close();
+ to.delete("test2");
+ Assert.assertEquals(0, rowCnt);
+
+ }
+
}