You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/04/29 03:03:45 UTC
[8/9] accumulo git commit: ACCUMULO-3759 Fix Java 8 compiler warnings
ACCUMULO-3759 Fix Java 8 compiler warnings
* Add missing hashCode in class with equals
* Enforce one-type per file
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e2e6780
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e2e6780
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e2e6780
Branch: refs/heads/master
Commit: 6e2e6780fc59c86112fba30a5211081bb6e77979
Parents: f996387
Author: Christopher Tubbs <ct...@apache.org>
Authored: Tue Apr 28 20:30:22 2015 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Tue Apr 28 20:30:22 2015 -0400
----------------------------------------------------------------------
.../core/client/impl/OfflineIterator.java | 340 ++++++++++++
.../core/client/impl/OfflineScanner.java | 314 -----------
.../core/compaction/CompactionSettings.java | 42 --
.../accumulo/core/compaction/PatternType.java | 28 +
.../accumulo/core/compaction/SizeType.java | 30 ++
.../accumulo/core/compaction/StringType.java | 24 +
.../apache/accumulo/core/compaction/Type.java | 21 +
.../accumulo/core/compaction/UIntType.java | 27 +
.../core/file/DispatchingFileFactory.java | 136 +++++
.../accumulo/core/file/FileOperations.java | 106 ----
.../accumulo/core/cli/TestClientOpts.java | 5 +
.../client/CountingVerifyingReceiver.java | 64 +++
.../simple/client/RandomBatchScanner.java | 38 --
pom.xml | 1 +
.../accumulo/master/tableOps/BulkImport.java | 363 -------------
.../master/tableOps/CancelCompactions.java | 23 -
.../accumulo/master/tableOps/ChooseDir.java | 53 ++
.../accumulo/master/tableOps/CleanUp.java | 287 ++++++++++
.../master/tableOps/CleanUpBulkImport.java | 64 +++
.../accumulo/master/tableOps/CloneInfo.java | 36 ++
.../accumulo/master/tableOps/CloneMetadata.java | 54 ++
.../master/tableOps/ClonePermissions.java | 73 +++
.../accumulo/master/tableOps/CloneTable.java | 195 -------
.../master/tableOps/CloneZookeeper.java | 76 +++
.../accumulo/master/tableOps/CompactRange.java | 159 ------
.../master/tableOps/CompactionDriver.java | 188 +++++++
.../master/tableOps/CompleteBulkImport.java | 45 ++
.../accumulo/master/tableOps/CopyFailed.java | 158 ++++++
.../accumulo/master/tableOps/CreateDir.java | 51 ++
.../master/tableOps/CreateImportDir.java | 61 +++
.../master/tableOps/CreateNamespace.java | 137 -----
.../accumulo/master/tableOps/CreateTable.java | 251 ---------
.../master/tableOps/DeleteNamespace.java | 55 --
.../accumulo/master/tableOps/DeleteTable.java | 265 ----------
.../accumulo/master/tableOps/ExportInfo.java | 29 ++
.../accumulo/master/tableOps/ExportTable.java | 257 ---------
.../master/tableOps/FinishCancelCompaction.java | 40 ++
.../master/tableOps/FinishCloneTable.java | 64 +++
.../master/tableOps/FinishCreateNamespace.java | 58 +++
.../master/tableOps/FinishCreateTable.java | 62 +++
.../master/tableOps/FinishImportTable.java | 68 +++
.../tableOps/ImportPopulateZookeeper.java | 104 ++++
.../master/tableOps/ImportSetupPermissions.java | 65 +++
.../accumulo/master/tableOps/ImportTable.java | 521 -------------------
.../master/tableOps/ImportedTableInfo.java | 31 ++
.../accumulo/master/tableOps/LoadFiles.java | 209 ++++++++
.../master/tableOps/MapImportFileNames.java | 111 ++++
.../master/tableOps/MoveExportedFiles.java | 71 +++
.../master/tableOps/NamespaceCleanUp.java | 75 +++
.../accumulo/master/tableOps/NamespaceInfo.java | 31 ++
.../master/tableOps/PopulateMetadata.java | 54 ++
.../master/tableOps/PopulateMetadataTable.java | 217 ++++++++
.../master/tableOps/PopulateZookeeper.java | 77 +++
.../PopulateZookeeperWithNamespace.java | 74 +++
.../tableOps/SetupNamespacePermissions.java | 55 ++
.../master/tableOps/SetupPermissions.java | 63 +++
.../accumulo/master/tableOps/TableInfo.java | 35 ++
.../accumulo/master/tableOps/TableRangeOp.java | 45 --
.../master/tableOps/TableRangeOpWait.java | 69 +++
.../master/tableOps/WriteExportFiles.java | 268 ++++++++++
.../apache/accumulo/tserver/InMemoryMap.java | 119 -----
.../accumulo/tserver/MemKeyComparator.java | 44 ++
.../tserver/MemKeyConversionIterator.java | 96 ++++
.../PartialMutationSkippingIterator.java | 54 ++
.../accumulo/test/EstimateInMemMapOverhead.java | 317 -----------
.../test/InMemoryMapMemoryUsageTest.java | 102 ++++
.../accumulo/test/IntObjectMemoryUsageTest.java | 65 +++
.../apache/accumulo/test/MemoryUsageTest.java | 64 +++
.../accumulo/test/MutationMemoryUsageTest.java | 98 ++++
.../accumulo/test/TextMemoryUsageTest.java | 82 +++
.../accumulo/test/continuous/HistData.java | 49 ++
.../accumulo/test/continuous/Histogram.java | 30 --
72 files changed, 4406 insertions(+), 3237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
new file mode 100644
index 0000000..b035e3e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+class OfflineIterator implements Iterator<Entry<Key,Value>> {
+
+ static class OfflineIteratorEnvironment implements IteratorEnvironment {
+
+ private final Authorizations authorizations;
+
+ public OfflineIteratorEnvironment(Authorizations auths) {
+ this.authorizations = auths;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public AccumuloConfiguration getConfig() {
+ return AccumuloConfiguration.getDefaultConfiguration();
+ }
+
+ @Override
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.scan;
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ return false;
+ }
+
+ private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+ @Override
+ public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+ topLevelIterators.add(iter);
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ return authorizations;
+ }
+
+ SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+ if (topLevelIterators.isEmpty())
+ return iter;
+ ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
+ allIters.add(iter);
+ return new MultiIterator(allIters, false);
+ }
+ }
+
+ private SortedKeyValueIterator<Key,Value> iter;
+ private Range range;
+ private KeyExtent currentExtent;
+ private Connector conn;
+ private String tableId;
+ private Authorizations authorizations;
+ private Instance instance;
+ private ScannerOptions options;
+ private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
+ private AccumuloConfiguration config;
+
+ public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) {
+ this.options = new ScannerOptions(options);
+ this.instance = instance;
+ this.range = range;
+
+ if (this.options.fetchedColumns.size() > 0) {
+ this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
+ }
+
+ this.tableId = table.toString();
+ this.authorizations = authorizations;
+ this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
+
+ try {
+ conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
+ config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
+ nextTablet();
+
+ while (iter != null && !iter.hasTop())
+ nextTablet();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter != null && iter.hasTop();
+ }
+
+ @Override
+ public Entry<Key,Value> next() {
+ try {
+ byte[] v = iter.getTopValue().get();
+ // copy just like tablet server does, do this before calling next
+ KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
+
+ iter.next();
+
+ while (iter != null && !iter.hasTop())
+ nextTablet();
+
+ return ret;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
+
+ Range nextRange = null;
+
+ if (currentExtent == null) {
+ Text startRow;
+
+ if (range.getStartKey() != null)
+ startRow = range.getStartKey().getRow();
+ else
+ startRow = new Text();
+
+ nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
+ } else {
+
+ if (currentExtent.getEndRow() == null) {
+ iter = null;
+ return;
+ }
+
+ if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
+ iter = null;
+ return;
+ }
+
+ nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
+ }
+
+ List<String> relFiles = new ArrayList<String>();
+
+ Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
+
+ while (eloc.getSecond() != null) {
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ Tables.clearCache(instance);
+ if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
+ throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst());
+ }
+ }
+
+ UtilWaitThread.sleep(250);
+
+ eloc = getTabletFiles(nextRange, relFiles);
+ }
+
+ KeyExtent extent = eloc.getFirst();
+
+ if (!extent.getTableId().toString().equals(tableId)) {
+ throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
+ }
+
+ if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
+ throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
+
+ // Old property is only used to resolve relative paths into absolute paths. For systems upgraded
+ // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration
+ @SuppressWarnings("deprecation")
+ String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR;
+
+ List<String> absFiles = new ArrayList<String>();
+ for (String relPath : relFiles) {
+ if (relPath.contains(":")) {
+ absFiles.add(relPath);
+ } else {
+ // handle old-style relative paths
+ if (relPath.startsWith("..")) {
+ absFiles.add(tablesDir + relPath.substring(2));
+ } else {
+ absFiles.add(tablesDir + "/" + tableId + relPath);
+ }
+ }
+ }
+
+ iter = createIterator(extent, absFiles);
+ iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true);
+ currentExtent = extent;
+
+ }
+
+ private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setBatchSize(100);
+ scanner.setRange(nextRange);
+
+ RowIterator rowIter = new RowIterator(scanner);
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+
+ KeyExtent extent = null;
+ String location = null;
+
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+ Key key = entry.getKey();
+
+ if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+ relFiles.add(key.getColumnQualifier().toString());
+ }
+
+ if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
+ || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
+ location = entry.getValue().toString();
+ }
+
+ if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+ extent = new KeyExtent(key.getRow(), entry.getValue());
+ }
+
+ }
+ return new Pair<KeyExtent,String>(extent, location);
+ }
+
+ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException,
+ IOException {
+
+ // TODO share code w/ tablet - ACCUMULO-1303
+ AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId);
+
+ Configuration conf = CachedConfiguration.getInstance();
+
+ for (SortedKeyValueIterator<Key,Value> reader : readers) {
+ ((FileSKVIterator) reader).close();
+ }
+
+ readers.clear();
+
+ // TODO need to close files - ACCUMULO-1303
+ for (String file : absFiles) {
+ FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+ readers.add(reader);
+ }
+
+ MultiIterator multiIter = new MultiIterator(readers, extent);
+
+ OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations);
+
+ DeletingIterator delIter = new DeletingIterator(multiIter, false);
+
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
+
+ byte[] defaultSecurityLabel;
+
+ ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+ defaultSecurityLabel = cv.getExpression();
+
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+
+ return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
+ options.serverSideIteratorOptions, iterEnv, false));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 2f31319..427a7cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@ -18,332 +18,18 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyValue;
-import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.VisibilityFilter;
-import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.volume.VolumeConfiguration;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
-class OfflineIterator implements Iterator<Entry<Key,Value>> {
-
- static class OfflineIteratorEnvironment implements IteratorEnvironment {
-
- private final Authorizations authorizations;
-
- public OfflineIteratorEnvironment(Authorizations auths) {
- this.authorizations = auths;
- }
-
- @Override
- public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public AccumuloConfiguration getConfig() {
- return AccumuloConfiguration.getDefaultConfiguration();
- }
-
- @Override
- public IteratorScope getIteratorScope() {
- return IteratorScope.scan;
- }
-
- @Override
- public boolean isFullMajorCompaction() {
- return false;
- }
-
- private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
-
- @Override
- public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
- topLevelIterators.add(iter);
- }
-
- @Override
- public Authorizations getAuthorizations() {
- return authorizations;
- }
-
- SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
- if (topLevelIterators.isEmpty())
- return iter;
- ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
- allIters.add(iter);
- return new MultiIterator(allIters, false);
- }
- }
-
- private SortedKeyValueIterator<Key,Value> iter;
- private Range range;
- private KeyExtent currentExtent;
- private Connector conn;
- private String tableId;
- private Authorizations authorizations;
- private Instance instance;
- private ScannerOptions options;
- private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
- private AccumuloConfiguration config;
-
- public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) {
- this.options = new ScannerOptions(options);
- this.instance = instance;
- this.range = range;
-
- if (this.options.fetchedColumns.size() > 0) {
- this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
- }
-
- this.tableId = table.toString();
- this.authorizations = authorizations;
- this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
-
- try {
- conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
- config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration());
- nextTablet();
-
- while (iter != null && !iter.hasTop())
- nextTablet();
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean hasNext() {
- return iter != null && iter.hasTop();
- }
-
- @Override
- public Entry<Key,Value> next() {
- try {
- byte[] v = iter.getTopValue().get();
- // copy just like tablet server does, do this before calling next
- KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
-
- iter.next();
-
- while (iter != null && !iter.hasTop())
- nextTablet();
-
- return ret;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void nextTablet() throws TableNotFoundException, AccumuloException, IOException {
-
- Range nextRange = null;
-
- if (currentExtent == null) {
- Text startRow;
-
- if (range.getStartKey() != null)
- startRow = range.getStartKey().getRow();
- else
- startRow = new Text();
-
- nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
- } else {
-
- if (currentExtent.getEndRow() == null) {
- iter = null;
- return;
- }
-
- if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
- iter = null;
- return;
- }
-
- nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
- }
-
- List<String> relFiles = new ArrayList<String>();
-
- Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-
- while (eloc.getSecond() != null) {
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- Tables.clearCache(instance);
- if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
- throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst());
- }
- }
-
- UtilWaitThread.sleep(250);
-
- eloc = getTabletFiles(nextRange, relFiles);
- }
-
- KeyExtent extent = eloc.getFirst();
-
- if (!extent.getTableId().toString().equals(tableId)) {
- throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
- }
-
- if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
- throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-
- // Old property is only used to resolve relative paths into absolute paths. For systems upgraded
- // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration
- @SuppressWarnings("deprecation")
- String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR;
-
- List<String> absFiles = new ArrayList<String>();
- for (String relPath : relFiles) {
- if (relPath.contains(":")) {
- absFiles.add(relPath);
- } else {
- // handle old-style relative paths
- if (relPath.startsWith("..")) {
- absFiles.add(tablesDir + relPath.substring(2));
- } else {
- absFiles.add(tablesDir + "/" + tableId + relPath);
- }
- }
- }
-
- iter = createIterator(extent, absFiles);
- iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true);
- currentExtent = extent;
-
- }
-
- private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException {
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.setBatchSize(100);
- scanner.setRange(nextRange);
-
- RowIterator rowIter = new RowIterator(scanner);
- Iterator<Entry<Key,Value>> row = rowIter.next();
-
- KeyExtent extent = null;
- String location = null;
-
- while (row.hasNext()) {
- Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
-
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
- relFiles.add(key.getColumnQualifier().toString());
- }
-
- if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)
- || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
- location = entry.getValue().toString();
- }
-
- if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
- extent = new KeyExtent(key.getRow(), entry.getValue());
- }
-
- }
- return new Pair<KeyExtent,String>(extent, location);
- }
-
- private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException,
- IOException {
-
- // TODO share code w/ tablet - ACCUMULO-1303
- AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId);
-
- Configuration conf = CachedConfiguration.getInstance();
-
- for (SortedKeyValueIterator<Key,Value> reader : readers) {
- ((FileSKVIterator) reader).close();
- }
-
- readers.clear();
-
- // TODO need to close files - ACCUMULO-1303
- for (String file : absFiles) {
- FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
- readers.add(reader);
- }
-
- MultiIterator multiIter = new MultiIterator(readers, extent);
-
- OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations);
-
- DeletingIterator delIter = new DeletingIterator(multiIter, false);
-
- ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
- ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
-
- byte[] defaultSecurityLabel;
-
- ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
- defaultSecurityLabel = cv.getExpression();
-
- VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
-
- return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
- options.serverSideIteratorOptions, iterEnv, false));
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
-}
-
-/**
- *
- */
public class OfflineScanner extends ScannerOptions implements Scanner {
private int batchSize;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
index a45a692..43f8c0f 100644
--- a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java
@@ -18,48 +18,6 @@
package org.apache.accumulo.core.compaction;
import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-
-import com.google.common.base.Preconditions;
-
-interface Type {
- String convert(String str);
-}
-
-class SizeType implements Type {
- @Override
- public String convert(String str) {
- long size = AccumuloConfiguration.getMemoryInBytes(str);
- Preconditions.checkArgument(size > 0);
- return Long.toString(size);
- }
-}
-
-class PatternType implements Type {
- @Override
- public String convert(String str) {
- // ensure it compiles
- Pattern.compile(str);
- return str;
- }
-}
-
-class UIntType implements Type {
- @Override
- public String convert(String str) {
- Preconditions.checkArgument(Integer.parseInt(str) > 0);
- return str;
- }
-}
-
-class StringType implements Type {
- @Override
- public String convert(String str) {
- return str;
- }
-}
public enum CompactionSettings {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
new file mode 100644
index 0000000..c52dcb4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import java.util.regex.Pattern;
+
+class PatternType implements Type {
+ @Override
+ public String convert(String str) {
+ // ensure it compiles
+ Pattern.compile(str);
+ return str;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
new file mode 100644
index 0000000..c2af401
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
+import com.google.common.base.Preconditions;
+
+class SizeType implements Type {
+ @Override
+ public String convert(String str) {
+ long size = AccumuloConfiguration.getMemoryInBytes(str);
+ Preconditions.checkArgument(size > 0);
+ return Long.toString(size);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
new file mode 100644
index 0000000..7098a5c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+class StringType implements Type {
+ @Override
+ public String convert(String str) {
+ return str;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/Type.java b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
new file mode 100644
index 0000000..d8f81a6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+interface Type {
+ String convert(String str);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
new file mode 100644
index 0000000..c8880fc
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.compaction;
+
+import com.google.common.base.Preconditions;
+
+class UIntType implements Type {
+ @Override
+ public String convert(String str) {
+ Preconditions.checkArgument(Integer.parseInt(str) > 0);
+ return str;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
new file mode 100644
index 0000000..128a931
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.map.MapFileOperations;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+class DispatchingFileFactory extends FileOperations {
+
+ private FileOperations findFileFactory(String file) {
+
+ Path p = new Path(file);
+ String name = p.getName();
+
+ if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
+ return new MapFileOperations();
+ }
+ String[] sp = name.split("\\.");
+
+ if (sp.length < 2) {
+ throw new IllegalArgumentException("File name " + name + " has no extension");
+ }
+
+ String extension = sp[sp.length - 1];
+
+ if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) {
+ return new MapFileOperations();
+ } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) {
+ return new RFileOperations();
+ } else {
+ throw new IllegalArgumentException("File type " + extension + " not supported");
+ }
+ }
+
+ @Override
+ public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null);
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+ if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+ return new BloomFilterLayer.Reader(iter, acuconf);
+ }
+ return iter;
+ }
+
+ @Override
+ public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
+ if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+ return new BloomFilterLayer.Writer(writer, acuconf);
+ }
+ return writer;
+ }
+
+ @Override
+ public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ return findFileFactory(file).getFileSize(file, fs, conf, acuconf);
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+ AccumuloConfiguration tableConf) throws IOException {
+ return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+ AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+
+ if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+ indexCache = null;
+ if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+ dataCache = null;
+
+ return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
+ BlockCache dataCache, BlockCache indexCache) throws IOException {
+
+ if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+ indexCache = null;
+ if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+ dataCache = null;
+
+ FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
+ if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+ return new BloomFilterLayer.Reader(iter, acuconf);
+ }
+ return iter;
+ }
+
+ @Override
+ public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
+ throws IOException {
+
+ if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
+ iCache = null;
+ if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
+ dCache = null;
+
+ return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 78d0407..3798453 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -27,115 +27,9 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.map.MapFileOperations;
import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-class DispatchingFileFactory extends FileOperations {
-
- private FileOperations findFileFactory(String file) {
-
- Path p = new Path(file);
- String name = p.getName();
-
- if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
- return new MapFileOperations();
- }
- String[] sp = name.split("\\.");
-
- if (sp.length < 2) {
- throw new IllegalArgumentException("File name " + name + " has no extension");
- }
-
- String extension = sp[sp.length - 1];
-
- if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) {
- return new MapFileOperations();
- } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) {
- return new RFileOperations();
- } else {
- throw new IllegalArgumentException("File type " + extension + " not supported");
- }
- }
-
- @Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null);
- }
-
- @Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Reader(iter, acuconf);
- }
- return iter;
- }
-
- @Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Writer(writer, acuconf);
- }
- return writer;
- }
-
- @Override
- public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return findFileFactory(file).getFileSize(file, fs, conf, acuconf);
- }
-
- @Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf) throws IOException {
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
- }
-
- @Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- indexCache = null;
- if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dataCache = null;
-
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
- }
-
- @Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
- BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- indexCache = null;
- if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dataCache = null;
-
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Reader(iter, acuconf);
- }
- return iter;
- }
-
- @Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
- throws IOException {
-
- if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- iCache = null;
- if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dCache = null;
-
- return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
- }
-
-}
public abstract class FileOperations {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
index f0fdcca..65df5c9 100644
--- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
+++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java
@@ -263,5 +263,10 @@ public class TestClientOpts {
public boolean equals(Object o) {
return o instanceof EmptyToken;
}
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
new file mode 100644
index 0000000..873f886
--- /dev/null
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.client;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal class used to verify validity of data read.
+ */
+class CountingVerifyingReceiver {
+ private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class);
+
+ long count = 0;
+ int expectedValueSize = 0;
+ HashMap<Text,Boolean> expectedRows;
+
+ CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) {
+ this.expectedRows = expectedRows;
+ this.expectedValueSize = expectedValueSize;
+ }
+
+ public void receive(Key key, Value value) {
+
+ String row = key.getRow().toString();
+ long rowid = Integer.parseInt(row.split("_")[1]);
+
+ byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize);
+
+ if (!Arrays.equals(expectedValue, value.get())) {
+ log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8));
+ }
+
+ if (!expectedRows.containsKey(key.getRow())) {
+ log.error("Got unexpected key " + key);
+ } else {
+ expectedRows.put(key.getRow(), true);
+ }
+
+ count++;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
----------------------------------------------------------------------
diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
index 6f8b485..a43b97d 100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java
@@ -16,10 +16,8 @@
*/
package org.apache.accumulo.examples.simple.client;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.examples.simple.client.RandomBatchWriter.abs;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
@@ -43,42 +41,6 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
/**
- * Internal class used to verify validity of data read.
- */
-class CountingVerifyingReceiver {
- private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class);
-
- long count = 0;
- int expectedValueSize = 0;
- HashMap<Text,Boolean> expectedRows;
-
- CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) {
- this.expectedRows = expectedRows;
- this.expectedValueSize = expectedValueSize;
- }
-
- public void receive(Key key, Value value) {
-
- String row = key.getRow().toString();
- long rowid = Integer.parseInt(row.split("_")[1]);
-
- byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize);
-
- if (!Arrays.equals(expectedValue, value.get())) {
- log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8));
- }
-
- if (!expectedRows.containsKey(key.getRow())) {
- log.error("Got unexpected key " + key);
- } else {
- expectedRows.put(key.getRow(), true);
- }
-
- count++;
- }
-}
-
-/**
* Simple example for reading random batches of data from Accumulo. See docs/examples/README.batch for instructions.
*/
public class RandomBatchScanner {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0bcc689..f680f84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -946,6 +946,7 @@
<property name="eachLine" value="true" />
</module>
<module name="TreeWalker">
+ <module name="OneTopLevelClass" />
<module name="RegexpSinglelineJava">
<property name="format" value="\s+$" />
<property name="message" value="Line has trailing whitespace." />
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 7f83988..031a80c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -16,71 +16,34 @@
*/
package org.apache.accumulo.master.tableOps;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IsolatedScanner;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ServerClient;
import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
-import org.apache.htrace.wrappers.TraceExecutorService;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -302,329 +265,3 @@ public class BulkImport extends MasterRepo {
Utils.getReadLock(tableId, tid).unlock();
}
}
-
-class CleanUpBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- log.debug("removing the bulk processing flag file in " + bulk);
- Path bulkDir = new Path(bulk);
- MetadataTableUtil.removeBulkLoadInProgressFlag(master, "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
- MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString());
- log.debug("removing the metadata table markers for loaded files");
- Connector conn = master.getConnector();
- MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
- log.debug("releasing HDFS reservations for " + source + " and " + error);
- Utils.unreserveHdfsDirectory(source, tid);
- Utils.unreserveHdfsDirectory(error, tid);
- Utils.getReadLock(tableId, tid).unlock();
- log.debug("completing bulk import transaction " + tid);
- ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
- return null;
- }
-}
-
-class CompleteBulkImport extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CompleteBulkImport(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
- return new CopyFailed(tableId, source, bulk, error);
- }
-}
-
-class CopyFailed extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private String tableId;
- private String source;
- private String bulk;
- private String error;
-
- public CopyFailed(String tableId, String source, String bulk, String error) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.error = error;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- Set<TServerInstance> finished = new HashSet<TServerInstance>();
- Set<TServerInstance> running = master.onlineTabletServers();
- for (TServerInstance server : running) {
- try {
- TServerConnection client = master.getConnection(server);
- if (client != null && !client.isActive(tid))
- finished.add(server);
- } catch (TException ex) {
- log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex);
- }
- }
- if (finished.containsAll(running))
- return 0;
- return 500;
- }
-
- @Override
- public Repo<Master> call(long tid, Master master) throws Exception {
- // This needs to execute after the arbiter is stopped
-
- VolumeManager fs = master.getFileSystem();
-
- if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
- return new CleanUpBulkImport(tableId, source, bulk, error);
-
- HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
- HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
-
- try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(error, BulkImport.FAILURES_TXT)), UTF_8))) {
- String line = null;
- while ((line = in.readLine()) != null) {
- Path path = new Path(line);
- if (!fs.exists(new Path(error, path.getName())))
- failures.put(new FileRef(line, path), line);
- }
- }
-
- /*
- * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
- * have no loaded markers.
- */
-
- // determine which failed files were loaded
- Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
- mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-
- for (Entry<Key,Value> entry : mscanner) {
- if (Long.parseLong(entry.getValue().toString()) == tid) {
- FileRef loadedFile = new FileRef(fs, entry.getKey());
- String absPath = failures.remove(loadedFile);
- if (absPath != null) {
- loadedFailures.put(loadedFile, absPath);
- }
- }
- }
-
- // move failed files that were not loaded
- for (String failure : failures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
- fs.rename(orig, dest);
- log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
- }
-
- if (loadedFailures.size() > 0) {
- DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ,
- master.getConfiguration());
-
- HashSet<String> workIds = new HashSet<String>();
-
- for (String failure : loadedFailures.values()) {
- Path orig = new Path(failure);
- Path dest = new Path(error, orig.getName());
-
- if (fs.exists(dest))
- continue;
-
- bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
- workIds.add(orig.getName());
- log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
- }
-
- bifCopyQueue.waitUntilDone(workIds);
- }
-
- fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
- return new CleanUpBulkImport(tableId, source, bulk, error);
- }
-
-}
-
-class LoadFiles extends MasterRepo {
-
- private static final long serialVersionUID = 1L;
-
- private static ExecutorService threadPool = null;
- private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
-
- private String tableId;
- private String source;
- private String bulk;
- private String errorDir;
- private boolean setTime;
-
- public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
- this.tableId = tableId;
- this.source = source;
- this.bulk = bulk;
- this.errorDir = errorDir;
- this.setTime = setTime;
- }
-
- @Override
- public long isReady(long tid, Master master) throws Exception {
- if (master.onlineTabletServers().size() == 0)
- return 500;
- return 0;
- }
-
- private static synchronized ExecutorService getThreadPool(Master master) {
- if (threadPool == null) {
- int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
- ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
- pool.allowCoreThreadTimeOut(true);
- threadPool = new TraceExecutorService(pool);
- }
- return threadPool;
- }
-
- @Override
- public Repo<Master> call(final long tid, final Master master) throws Exception {
- ExecutorService executor = getThreadPool(master);
- final AccumuloConfiguration conf = master.getConfiguration();
- VolumeManager fs = master.getFileSystem();
- List<FileStatus> files = new ArrayList<FileStatus>();
- for (FileStatus entry : fs.listStatus(new Path(bulk))) {
- files.add(entry);
- }
- log.debug("tid " + tid + " importing " + files.size() + " files");
-
- Path writable = new Path(this.errorDir, ".iswritable");
- if (!fs.createNewFile(writable)) {
- // Maybe this is a re-try... clear the flag and try again
- fs.delete(writable);
- if (!fs.createNewFile(writable))
- throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
- "Unable to write to " + this.errorDir);
- }
- fs.delete(writable);
-
- final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
- for (FileStatus f : files)
- filesToLoad.add(f.getPath().toString());
-
- final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
- for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
- List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-
- if (master.onlineTabletServers().size() == 0)
- log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
-
- while (master.onlineTabletServers().size() == 0) {
- UtilWaitThread.sleep(500);
- }
-
- // Use the threadpool to assign files one-at-a-time to the server
- final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
- for (final String file : filesToLoad) {
- results.add(executor.submit(new Callable<List<String>>() {
- @Override
- public List<String> call() {
- List<String> failures = new ArrayList<String>();
- ClientService.Client client = null;
- String server = null;
- try {
- // get a connection to a random tablet server, do not prefer cached connections because
- // this is running on the master and there are lots of connections to tablet servers
- // serving the metadata tablets
- long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
- client = pair.getSecond();
- server = pair.getFirst();
- List<String> attempt = Collections.singletonList(file);
- log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
- List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
- if (fail.isEmpty()) {
- loaded.add(file);
- } else {
- failures.addAll(fail);
- }
- } catch (Exception ex) {
- log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
- } finally {
- ServerClient.close(client);
- }
- return failures;
- }
- }));
- }
- Set<String> failures = new HashSet<String>();
- for (Future<List<String>> f : results)
- failures.addAll(f.get());
- filesToLoad.removeAll(loaded);
- if (filesToLoad.size() > 0) {
- log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
- UtilWaitThread.sleep(100);
- }
- }
-
- FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
- try {
- for (String f : filesToLoad) {
- out.write(f);
- out.write("\n");
- }
- } finally {
- out.close();
- }
-
- // return the next step, which will perform cleanup
- return new CompleteBulkImport(tableId, source, bulk, errorDir);
- }
-
- static String sampleList(Collection<?> potentiallyLongList, int max) {
- StringBuffer result = new StringBuffer();
- result.append("[");
- int i = 0;
- for (Object obj : potentiallyLongList) {
- result.append(obj);
- if (i >= max) {
- result.append("...");
- break;
- } else {
- result.append(", ");
- }
- i++;
- }
- if (i < max)
- result.delete(result.length() - 2, result.length());
- result.append("]");
- return result.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index 4f4b27e..e268f17 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -27,29 +27,6 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-class FinishCancelCompaction extends MasterRepo {
- private static final long serialVersionUID = 1L;
- private String tableId;
-
- public FinishCancelCompaction(String tableId) {
- this.tableId = tableId;
- }
-
- @Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- Utils.getReadLock(tableId, tid).unlock();
- return null;
- }
-
- @Override
- public void undo(long tid, Master environment) throws Exception {
-
- }
-}
-
-/**
- *
- */
public class CancelCompactions extends MasterRepo {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
new file mode 100644
index 0000000..3e1aa33
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+class ChooseDir extends MasterRepo {
+ private static final long serialVersionUID = 1L;
+
+ private TableInfo tableInfo;
+
+ ChooseDir(TableInfo ti) {
+ this.tableInfo = ti;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here
+ tableInfo.dir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION;
+ return new CreateDir(tableInfo);
+ }
+
+ @Override
+ public void undo(long tid, Master master) throws Exception {
+
+ }
+}
\ No newline at end of file