You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:58:29 UTC
[58/64] [abbrv] Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 52688cb,0000000..7bc1a80
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@@ -1,71 -1,0 +1,67 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.compaction;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The interface for customizing major compactions.
+ * <p>
+ * The tablet server has one thread to ask many tablets if they should compact. When the strategy returns true, then tablet is added to the queue of tablets
+ * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets'
+ * lock. This gives the strategy the ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed
+ * and the compactionPlan computed. This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to
+ * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}.
+ * <p>
+ * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the
+ * compaction thread.
+ */
+public abstract class CompactionStrategy {
+
+ /**
+ * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
- *
- * @param options
+ */
+ public void init(Map<String,String> options) {}
+
+ /**
+ * Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and
+ * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
+ * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
+ *
+ */
+ public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
+
+ /**
+ * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call
+ * the {@link #getCompactionPlan(MajorCompactionRequest)}.
+ *
+ * @param request
+ * basic details about the tablet
- * @throws IOException
+ */
+ public void gatherInformation(MajorCompactionRequest request) throws IOException {}
+
+ /**
+ * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
+ *
+ * @param request
+ * basic details about the tablet
+ * @return the plan for a major compaction, or null to cancel the compaction.
- * @throws IOException
+ */
+ abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 25b8043,0000000..a1229e7
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -1,170 -1,0 +1,169 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.logger;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.MultiReader;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+public class LogReader {
+
+ static class Opts extends Help {
+ @Parameter(names = "-r", description = "print only mutations associated with the given row")
+ String row;
+ @Parameter(names = "-m", description = "limit the number of mutations printed per row")
+ int maxMutations = 5;
+ @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
+ String extent;
+ @Parameter(names = "-p", description = "search for a row that matches the given regex")
+ String regexp;
+ @Parameter(description = "<logfile> { <logfile> ... }")
+ List<String> files = new ArrayList<String>();
+ }
+
+ /**
+ * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
+ *
+ * @param args
+ * - first argument is the file to print
- * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ Opts opts = new Opts();
+ opts.parseArgs(LogReader.class.getName(), args);
+ VolumeManager fs = VolumeManagerImpl.get();
+
+ Matcher rowMatcher = null;
+ KeyExtent ke = null;
+ Text row = null;
+ if (opts.files.isEmpty()) {
+ new JCommander(opts).usage();
+ return;
+ }
+ if (opts.row != null)
+ row = new Text(opts.row);
+ if (opts.extent != null) {
+ String sa[] = opts.extent.split(";");
+ ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
+ }
+ if (opts.regexp != null) {
+ Pattern pattern = Pattern.compile(opts.regexp);
+ rowMatcher = pattern.matcher("");
+ }
+
+ Set<Integer> tabletIds = new HashSet<Integer>();
+
+ for (String file : opts.files) {
+
+ Path path = new Path(file);
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+
+ if (fs.isFile(path)) {
+ // read log entries from a simple hdfs file
+ DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+ DataInputStream input = streams.getDecryptingInputStream();
+
+ try {
+ while (true) {
+ try {
+ key.readFields(input);
+ value.readFields(input);
+ } catch (EOFException ex) {
+ break;
+ }
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ }
+ } finally {
+ input.close();
+ }
+ } else {
+ // read the log entries sorted in a map file
+ MultiReader input = new MultiReader(fs, path);
+ while (input.next(key, value)) {
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ }
+ }
+ }
+ }
+
+ public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
+
+ if (ke != null) {
+ if (key.event == LogEvents.DEFINE_TABLET) {
+ if (key.tablet.equals(ke)) {
+ tabletIds.add(key.tid);
+ } else {
+ return;
+ }
+ } else if (!tabletIds.contains(key.tid)) {
+ return;
+ }
+ }
+
+ if (row != null || rowMatcher != null) {
+ if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
+ boolean found = false;
+ for (Mutation m : value.mutations) {
+ if (row != null && new Text(m.getRow()).equals(row)) {
+ found = true;
+ break;
+ }
+
+ if (rowMatcher != null) {
+ rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
+ if (rowMatcher.matches()) {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (!found)
+ return;
+ } else {
+ return;
+ }
+
+ }
+
+ System.out.println(key);
+ System.out.println(LogFileValue.format(value, maxMutations));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
index b7d5fa9,277c741..745401b
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
@@@ -36,15 -30,16 +30,16 @@@ public class PostDelegatingVFSClassLoad
super(files, manager, parent);
}
+ @Override
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class<?> c = findLoadedClass(name);
- if (c == null) {
- try {
- // try finding this class here instead of parent
- findClass(name);
- } catch (ClassNotFoundException e) {
-
- }
+ if (c != null)
+ return c;
+ try {
+ // try finding this class here instead of parent
+ return findClass(name);
+ } catch (ClassNotFoundException e) {
+
}
return super.loadClass(name, resolve);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
index c9fd2f5,104ea09..2973750
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
@@@ -42,76 -42,36 +42,71 @@@ import org.apache.hadoop.fs.Path
*/
public class HdfsFileSystem extends AbstractFileSystem
{
- private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
-
- private FileSystem fs;
-
- protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
+ private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
+
+ private FileSystem fs;
+
- /**
- *
- * @param rootName
- * @param fileSystemOptions
- */
+ protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
+ {
+ super(rootName, null, fileSystemOptions);
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
+ */
+ @Override
+ protected void addCapabilities(final Collection<Capability> capabilities)
+ {
+ capabilities.addAll(HdfsFileProvider.CAPABILITIES);
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
+ */
+ @Override
+ synchronized public void close()
+ {
+ try
{
- super(rootName, null, fileSystemOptions);
+ if (null != fs)
+ {
+ fs.close();
+ }
}
-
- /**
- * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
- */
- @Override
- protected void addCapabilities(final Collection<Capability> capabilities)
+ catch (final IOException e)
{
- capabilities.addAll(HdfsFileProvider.CAPABILITIES);
+ throw new RuntimeException("Error closing HDFS client", e);
}
-
- /**
- * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
- */
- @Override
- public void close()
+ super.close();
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(org.apache.commons.vfs2.provider.AbstractFileName)
+ */
+ @Override
+ protected FileObject createFile(final AbstractFileName name) throws Exception
+ {
+ throw new FileSystemException("Operation not supported");
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#resolveFile(org.apache.commons.vfs2.FileName)
+ */
+ @Override
+ public FileObject resolveFile(final FileName name) throws FileSystemException
+ {
+
+ synchronized (this)
{
+ if (null == this.fs)
+ {
+ final String hdfsUri = name.getRootURI();
+ final Configuration conf = new Configuration(true);
+ conf.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
+ this.fs = null;
try
{
- if (null != fs)
- {
- fs.close();
- }
+ fs = org.apache.hadoop.fs.FileSystem.get(conf);
}
catch (final IOException e)
{
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/716ea0ee/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java
----------------------------------------------------------------------