You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/10/14 21:38:41 UTC
[1/3] accumulo git commit: ACCUMULO-4028 copy the list of online
servers once
Repository: accumulo
Updated Branches:
refs/heads/master dd420f631 -> b34df1aba
ACCUMULO-4028 copy the list of online servers once
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/eacc27ac
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/eacc27ac
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/eacc27ac
Branch: refs/heads/master
Commit: eacc27ac47db3417a9ae405a96f7da6db2e0955b
Parents: 0212f2f
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 15:32:58 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 15:32:58 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/accumulo/master/tableOps/BulkImport.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/eacc27ac/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 5320aae..37edbc9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -538,6 +538,7 @@ class LoadFiles extends MasterRepo {
// Use the threadpool to assign files one-at-a-time to the server
final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+ final TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
final Random random = new Random();
for (final String file : filesToLoad) {
results.add(executor.submit(new Callable<List<String>>() {
@@ -551,7 +552,6 @@ class LoadFiles extends MasterRepo {
// this is running on the master and there are lots of connections to tablet servers
// serving the metadata tablets
long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
- TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]);
server = servers[random.nextInt(servers.length)].getLocation().toString();
client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis);
List<String> attempt = Collections.singletonList(file);
@@ -566,9 +566,7 @@ class LoadFiles extends MasterRepo {
} catch (Exception ex) {
log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
} finally {
- if (client != null) {
- ThriftUtil.returnClient(client);
- }
+ ThriftUtil.returnClient(client);
}
return failures;
}
[3/3] accumulo git commit: Merge branch '1.7'
Posted by ec...@apache.org.
Merge branch '1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b34df1ab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b34df1ab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b34df1ab
Branch: refs/heads/master
Commit: b34df1abaf5a1df88c8e3f53e61242b41a752231
Parents: dd420f6 d7e7c64
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 15:38:13 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 15:38:13 2015 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/master/tableOps/LoadFiles.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b34df1ab/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
[2/3] accumulo git commit: ACCUMULO-4028 copy the list of online
servers once
Posted by ec...@apache.org.
ACCUMULO-4028 copy the list of online servers once
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d7e7c643
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d7e7c643
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d7e7c643
Branch: refs/heads/master
Commit: d7e7c643c0f52c04175947955b59b173e0f62f58
Parents: 78e2c65 eacc27a
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 15:37:58 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 15:37:58 2015 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/master/tableOps/LoadFiles.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d7e7c643/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index a80e1ff,0000000..af5262a
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -1,213 -1,0 +1,213 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.htrace.wrappers.TraceExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+class LoadFiles extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private static ExecutorService threadPool = null;
+ private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
+
+ private String tableId;
+ private String source;
+ private String bulk;
+ private String errorDir;
+ private boolean setTime;
+
+ public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
+ this.tableId = tableId;
+ this.source = source;
+ this.bulk = bulk;
+ this.errorDir = errorDir;
+ this.setTime = setTime;
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ if (master.onlineTabletServers().size() == 0)
+ return 500;
+ return 0;
+ }
+
+ private static synchronized ExecutorService getThreadPool(Master master) {
+ if (threadPool == null) {
+ int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+ ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
+ pool.allowCoreThreadTimeOut(true);
+ threadPool = new TraceExecutorService(pool);
+ }
+ return threadPool;
+ }
+
+ @Override
+ public Repo<Master> call(final long tid, final Master master) throws Exception {
+ ExecutorService executor = getThreadPool(master);
+ final AccumuloConfiguration conf = master.getConfiguration();
+ VolumeManager fs = master.getFileSystem();
+ List<FileStatus> files = new ArrayList<FileStatus>();
+ for (FileStatus entry : fs.listStatus(new Path(bulk))) {
+ files.add(entry);
+ }
+ log.debug("tid " + tid + " importing " + files.size() + " files");
+
+ Path writable = new Path(this.errorDir, ".iswritable");
+ if (!fs.createNewFile(writable)) {
+ // Maybe this is a re-try... clear the flag and try again
+ fs.delete(writable);
+ if (!fs.createNewFile(writable))
+ throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
+ "Unable to write to " + this.errorDir);
+ }
+ fs.delete(writable);
+
+ final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
+ for (FileStatus f : files)
+ filesToLoad.add(f.getPath().toString());
+
+ final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
+ for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
+ List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
+
+ if (master.onlineTabletServers().size() == 0)
+ log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
+
+ while (master.onlineTabletServers().size() == 0) {
+ UtilWaitThread.sleep(500);
+ }
+
+ // Use the threadpool to assign files one-at-a-time to the server
+ final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
+ final Random random = new Random();
++ final TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
+ for (final String file : filesToLoad) {
+ results.add(executor.submit(new Callable<List<String>>() {
+ @Override
+ public List<String> call() {
+ List<String> failures = new ArrayList<String>();
+ ClientService.Client client = null;
+ HostAndPort server = null;
+ try {
+ // get a connection to a random tablet server, do not prefer cached connections because
+ // this is running on the master and there are lots of connections to tablet servers
+ // serving the metadata tablets
+ long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+ // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
- TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
+ server = servers[random.nextInt(servers.length)].getLocation();
+ client = ThriftUtil.getTServerClient(server, master, timeInMillis);
+ List<String> attempt = Collections.singletonList(file);
+ log.debug("Asking " + server + " to bulk import " + file);
+ List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
+ if (fail.isEmpty()) {
+ loaded.add(file);
+ } else {
+ failures.addAll(fail);
+ }
+ } catch (Exception ex) {
+ log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ return failures;
+ }
+ }));
+ }
+ Set<String> failures = new HashSet<String>();
+ for (Future<List<String>> f : results)
+ failures.addAll(f.get());
+ filesToLoad.removeAll(loaded);
+ if (filesToLoad.size() > 0) {
+ log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
+ UtilWaitThread.sleep(100);
+ }
+ }
+
+ FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
+ try {
+ for (String f : filesToLoad) {
+ out.write(f);
+ out.write("\n");
+ }
+ } finally {
+ out.close();
+ }
+
+ // return the next step, which will perform cleanup
+ return new CompleteBulkImport(tableId, source, bulk, errorDir);
+ }
+
+ static String sampleList(Collection<?> potentiallyLongList, int max) {
+ StringBuffer result = new StringBuffer();
+ result.append("[");
+ int i = 0;
+ for (Object obj : potentiallyLongList) {
+ result.append(obj);
+ if (i >= max) {
+ result.append("...");
+ break;
+ } else {
+ result.append(", ");
+ }
+ i++;
+ }
+ if (i < max)
+ result.delete(result.length() - 2, result.length());
+ result.append("]");
+ return result.toString();
+ }
+
+}