You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:58:29 UTC

[58/64] [abbrv] Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 52688cb,0000000..7bc1a80
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@@ -1,71 -1,0 +1,67 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.compaction;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +
 +/**
 + * The interface for customizing major compactions.
 + * <p>
 + * The tablet server has one thread to ask many tablets if they should compact. When the strategy returns true, then tablet is added to the queue of tablets
 + * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets'
 + * lock. This gives the strategy the ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed
 + * and the compactionPlan computed. This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to
 + * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}.
 + * <p>
 + * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the
 + * compaction thread.
 + */
 +public abstract class CompactionStrategy {
 +
 +  /**
 +   * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
-    * 
-    * @param options
 +   */
 +  public void init(Map<String,String> options) {}
 +
 +  /**
 +   * Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and
 +   * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
 +   * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
 +   * 
 +   */
 +  public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
 +
 +  /**
 +   * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call
 +   * the {@link #getCompactionPlan(MajorCompactionRequest)}.
 +   * 
 +   * @param request
 +   *          basic details about the tablet
-    * @throws IOException
 +   */
 +  public void gatherInformation(MajorCompactionRequest request) throws IOException {}
 +
 +  /**
 +   * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
 +   * 
 +   * @param request
 +   *          basic details about the tablet
 +   * @return the plan for a major compaction, or null to cancel the compaction.
-    * @throws IOException
 +   */
 +  abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException;
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 25b8043,0000000..a1229e7
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -1,170 -1,0 +1,169 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.logger;
 +
 +import java.io.DataInputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 +import org.apache.accumulo.tserver.log.MultiReader;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.JCommander;
 +import com.beust.jcommander.Parameter;
 +
 +public class LogReader {
 +
 +  static class Opts extends Help {
 +    @Parameter(names = "-r", description = "print only mutations associated with the given row")
 +    String row;
 +    @Parameter(names = "-m", description = "limit the number of mutations printed per row")
 +    int maxMutations = 5;
 +    @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
 +    String extent;
 +    @Parameter(names = "-p", description = "search for a row that matches the given regex")
 +    String regexp;
 +    @Parameter(description = "<logfile> { <logfile> ... }")
 +    List<String> files = new ArrayList<String>();
 +  }
 +
 +  /**
 +   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
 +   * 
 +   * @param args
 +   *          - first argument is the file to print
-    * @throws IOException
 +   */
 +  public static void main(String[] args) throws IOException {
 +    Opts opts = new Opts();
 +    opts.parseArgs(LogReader.class.getName(), args);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +
 +    Matcher rowMatcher = null;
 +    KeyExtent ke = null;
 +    Text row = null;
 +    if (opts.files.isEmpty()) {
 +      new JCommander(opts).usage();
 +      return;
 +    }
 +    if (opts.row != null)
 +      row = new Text(opts.row);
 +    if (opts.extent != null) {
 +      String sa[] = opts.extent.split(";");
 +      ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
 +    }
 +    if (opts.regexp != null) {
 +      Pattern pattern = Pattern.compile(opts.regexp);
 +      rowMatcher = pattern.matcher("");
 +    }
 +
 +    Set<Integer> tabletIds = new HashSet<Integer>();
 +
 +    for (String file : opts.files) {
 +
 +      Path path = new Path(file);
 +      LogFileKey key = new LogFileKey();
 +      LogFileValue value = new LogFileValue();
 +
 +      if (fs.isFile(path)) {
 +        // read log entries from a simple hdfs file
 +        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
 +        DataInputStream input = streams.getDecryptingInputStream();
 +
 +        try {
 +          while (true) {
 +            try {
 +              key.readFields(input);
 +              value.readFields(input);
 +            } catch (EOFException ex) {
 +              break;
 +            }
 +            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +          }
 +        } finally {
 +          input.close();
 +        }
 +      } else {
 +        // read the log entries sorted in a map file
 +        MultiReader input = new MultiReader(fs, path);
 +        while (input.next(key, value)) {
 +          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
 +
 +    if (ke != null) {
 +      if (key.event == LogEvents.DEFINE_TABLET) {
 +        if (key.tablet.equals(ke)) {
 +          tabletIds.add(key.tid);
 +        } else {
 +          return;
 +        }
 +      } else if (!tabletIds.contains(key.tid)) {
 +        return;
 +      }
 +    }
 +
 +    if (row != null || rowMatcher != null) {
 +      if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
 +        boolean found = false;
 +        for (Mutation m : value.mutations) {
 +          if (row != null && new Text(m.getRow()).equals(row)) {
 +            found = true;
 +            break;
 +          }
 +
 +          if (rowMatcher != null) {
 +            rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
 +            if (rowMatcher.matches()) {
 +              found = true;
 +              break;
 +            }
 +          }
 +        }
 +
 +        if (!found)
 +          return;
 +      } else {
 +        return;
 +      }
 +
 +    }
 +
 +    System.out.println(key);
 +    System.out.println(LogFileValue.format(value, maxMutations));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
index b7d5fa9,277c741..745401b
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
@@@ -36,15 -30,16 +30,16 @@@ public class PostDelegatingVFSClassLoad
      super(files, manager, parent);
    }
    
+   @Override
    protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
      Class<?> c = findLoadedClass(name);
 -    if (c == null) {
 -      try {
 -        // try finding this class here instead of parent
 -        findClass(name);
 -      } catch (ClassNotFoundException e) {
 -
 -      }
 +    if (c != null)
 +      return c;
 +    try {
 +      // try finding this class here instead of parent
 +      return findClass(name);
 +    } catch (ClassNotFoundException e) {
 +      
      }
      return super.loadClass(name, resolve);
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
index c9fd2f5,104ea09..2973750
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
@@@ -42,76 -42,36 +42,71 @@@ import org.apache.hadoop.fs.Path
   */
  public class HdfsFileSystem extends AbstractFileSystem
  {
 -    private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
 -
 -    private FileSystem fs;
 -
 -    protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
 +  private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
 +  
 +  private FileSystem fs;
 +  
-   /**
-    * 
-    * @param rootName
-    * @param fileSystemOptions
-    */
 +  protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
 +  {
 +    super(rootName, null, fileSystemOptions);
 +  }
 +  
 +  /**
 +   * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
 +   */
 +  @Override
 +  protected void addCapabilities(final Collection<Capability> capabilities)
 +  {
 +    capabilities.addAll(HdfsFileProvider.CAPABILITIES);
 +  }
 +  
 +  /**
 +   * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
 +   */
 +  @Override
 +  synchronized public void close()
 +  {
 +    try
      {
 -        super(rootName, null, fileSystemOptions);
 +      if (null != fs)
 +      {
 +        fs.close();
 +      }
      }
 -
 -    /**
 -     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
 -     */
 -    @Override
 -    protected void addCapabilities(final Collection<Capability> capabilities)
 +    catch (final IOException e)
      {
 -        capabilities.addAll(HdfsFileProvider.CAPABILITIES);
 +      throw new RuntimeException("Error closing HDFS client", e);
      }
 -
 -    /**
 -     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
 -     */
 -    @Override
 -    public void close()
 +    super.close();
 +  }
 +  
 +  /**
 +   * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(org.apache.commons.vfs2.provider.AbstractFileName)
 +   */
 +  @Override
 +  protected FileObject createFile(final AbstractFileName name) throws Exception
 +  {
 +    throw new FileSystemException("Operation not supported");
 +  }
 +  
 +  /**
 +   * @see org.apache.commons.vfs2.provider.AbstractFileSystem#resolveFile(org.apache.commons.vfs2.FileName)
 +   */
 +  @Override
 +  public FileObject resolveFile(final FileName name) throws FileSystemException
 +  {
 +    
 +    synchronized (this)
      {
 +      if (null == this.fs)
 +      {
 +        final String hdfsUri = name.getRootURI();
 +        final Configuration conf = new Configuration(true);
 +        conf.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
 +        this.fs = null;
          try
          {
 -            if (null != fs)
 -            {
 -                fs.close();
 -            }
 +          fs = org.apache.hadoop.fs.FileSystem.get(conf);
          }
          catch (final IOException e)
          {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
----------------------------------------------------------------------