You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/10/14 21:38:35 UTC

[2/2] accumulo git commit: ACCUMULO-4028 copy the list of online servers once

ACCUMULO-4028 copy the list of online servers once


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d7e7c643
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d7e7c643
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d7e7c643

Branch: refs/heads/1.7
Commit: d7e7c643c0f52c04175947955b59b173e0f62f58
Parents: 78e2c65 eacc27a
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed Oct 14 15:37:58 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Wed Oct 14 15:37:58 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/master/tableOps/LoadFiles.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d7e7c643/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index a80e1ff,0000000..af5262a
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -1,213 -1,0 +1,213 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.BufferedWriter;
 +import java.io.OutputStreamWriter;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.trace.Tracer;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.htrace.wrappers.TraceExecutorService;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.net.HostAndPort;
 +
 +class LoadFiles extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static ExecutorService threadPool = null;
 +  private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (master.onlineTabletServers().size() == 0)
 +      return 500;
 +    return 0;
 +  }
 +
 +  private static synchronized ExecutorService getThreadPool(Master master) {
 +    if (threadPool == null) {
 +      int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 +      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
 +      pool.allowCoreThreadTimeOut(true);
 +      threadPool = new TraceExecutorService(pool);
 +    }
 +    return threadPool;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, final Master master) throws Exception {
 +    ExecutorService executor = getThreadPool(master);
 +    final AccumuloConfiguration conf = master.getConfiguration();
 +    VolumeManager fs = master.getFileSystem();
 +    List<FileStatus> files = new ArrayList<FileStatus>();
 +    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 +      files.add(entry);
 +    }
 +    log.debug("tid " + tid + " importing " + files.size() + " files");
 +
 +    Path writable = new Path(this.errorDir, ".iswritable");
 +    if (!fs.createNewFile(writable)) {
 +      // Maybe this is a re-try... clear the flag and try again
 +      fs.delete(writable);
 +      if (!fs.createNewFile(writable))
 +        throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +            "Unable to write to " + this.errorDir);
 +    }
 +    fs.delete(writable);
 +
 +    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
 +    for (FileStatus f : files)
 +      filesToLoad.add(f.getPath().toString());
 +
 +    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
 +    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) {
 +      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
 +
 +      if (master.onlineTabletServers().size() == 0)
 +        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")");
 +
 +      while (master.onlineTabletServers().size() == 0) {
 +        UtilWaitThread.sleep(500);
 +      }
 +
 +      // Use the threadpool to assign files one-at-a-time to the server
 +      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
 +      final Random random = new Random();
++      final TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
 +      for (final String file : filesToLoad) {
 +        results.add(executor.submit(new Callable<List<String>>() {
 +          @Override
 +          public List<String> call() {
 +            List<String> failures = new ArrayList<String>();
 +            ClientService.Client client = null;
 +            HostAndPort server = null;
 +            try {
 +              // get a connection to a random tablet server, do not prefer cached connections because
 +              // this is running on the master and there are lots of connections to tablet servers
 +              // serving the metadata tablets
 +              long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 +              // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis);
-               TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
 +              server = servers[random.nextInt(servers.length)].getLocation();
 +              client = ThriftUtil.getTServerClient(server, master, timeInMillis);
 +              List<String> attempt = Collections.singletonList(file);
 +              log.debug("Asking " + server + " to bulk import " + file);
 +              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
 +              if (fail.isEmpty()) {
 +                loaded.add(file);
 +              } else {
 +                failures.addAll(fail);
 +              }
 +            } catch (Exception ex) {
 +              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
 +            } finally {
 +              ThriftUtil.returnClient(client);
 +            }
 +            return failures;
 +          }
 +        }));
 +      }
 +      Set<String> failures = new HashSet<String>();
 +      for (Future<List<String>> f : results)
 +        failures.addAll(f.get());
 +      filesToLoad.removeAll(loaded);
 +      if (filesToLoad.size() > 0) {
 +        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed");
 +        UtilWaitThread.sleep(100);
 +      }
 +    }
 +
 +    FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true);
 +    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8));
 +    try {
 +      for (String f : filesToLoad) {
 +        out.write(f);
 +        out.write("\n");
 +      }
 +    } finally {
 +      out.close();
 +    }
 +
 +    // return the next step, which will perform cleanup
 +    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 +  }
 +
 +  static String sampleList(Collection<?> potentiallyLongList, int max) {
 +    StringBuffer result = new StringBuffer();
 +    result.append("[");
 +    int i = 0;
 +    for (Object obj : potentiallyLongList) {
 +      result.append(obj);
 +      if (i >= max) {
 +        result.append("...");
 +        break;
 +      } else {
 +        result.append(", ");
 +      }
 +      i++;
 +    }
 +    if (i < max)
 +      result.delete(result.length() - 2, result.length());
 +    result.append("]");
 +    return result.toString();
 +  }
 +
 +}