You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/19 10:50:50 UTC
[20/51] [partial] ignite git commit: IGNITE-3916: Created separate
module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
deleted file mode 100644
index bd8ed2d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ /dev/null
@@ -1,1076 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Progressable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsMode;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
-import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
-
-/**
- * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
- * {@code IGFS} as Hadoop file system, you should configure this class
- * in Hadoop's {@code core-site.xml} as follows:
- * <pre name="code" class="xml">
- * <property>
- * <name>fs.default.name</name>
- * <value>igfs://ipc</value>
- * </property>
- *
- * <property>
- * <name>fs.igfs.impl</name>
- * <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value>
- * </property>
- * </pre>
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To
- * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
- * distribution:
- * <pre name="code" class="bash">
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * </pre>
- * <h1 class="header">Data vs Clients Nodes</h1>
- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
- * data nodes. Client nodes are responsible for basic file system operations as well as
- * accessing data nodes remotely. Usually, client nodes are started together
- * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
- * started together with Hadoop {@code task-tracker} processes.
- * <p>
- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
- * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
- */
-public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable {
- /** Logger. */
- private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class);
-
- /** Ensures that close routine is invoked at most once. */
- private final AtomicBoolean closeGuard = new AtomicBoolean();
-
- /** Grid remote client. */
- private HadoopIgfsWrapper rmtClient;
-
- /** The name of the user this File System created on behalf of. */
- private final String user;
-
- /** Working directory. */
- private IgfsPath workingDir;
-
- /** URI. */
- private final URI uri;
-
- /** Authority. */
- private String uriAuthority;
-
- /** Client logger. */
- private IgfsLogger clientLog;
-
- /** Server block size. */
- private long grpBlockSize;
-
- /** Default replication factor. */
- private short dfltReplication;
-
- /** Secondary URI string. */
- private URI secondaryUri;
-
- /** Mode resolver. */
- private IgfsModeResolver modeRslvr;
-
- /** The secondary file system factory. */
- private HadoopFileSystemFactory factory;
-
- /** Whether custom sequential reads before prefetch value is provided. */
- private boolean seqReadsBeforePrefetchOverride;
-
- /** Custom-provided sequential reads before prefetch. */
- private int seqReadsBeforePrefetch;
-
- /** Flag that controls whether file writes should be colocated on data node. */
- private boolean colocateFileWrites;
-
- /** Prefer local writes. */
- private boolean preferLocFileWrites;
-
- /**
- * @param name URI for file system.
- * @param cfg Configuration.
- * @throws URISyntaxException if name has invalid syntax.
- * @throws IOException If initialization failed.
- */
- public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException {
- super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1);
-
- uri = name;
-
- user = getFsHadoopUser();
-
- try {
- initialize(name, cfg);
- }
- catch (IOException e) {
- // Close client if exception occurred.
- if (rmtClient != null)
- rmtClient.close(false);
-
- throw e;
- }
-
- workingDir = new IgfsPath("/user/" + user);
- }
-
- /** {@inheritDoc} */
- @Override public void checkPath(Path path) {
- URI uri = path.toUri();
-
- if (uri.isAbsolute()) {
- if (!F.eq(uri.getScheme(), IGFS_SCHEME))
- throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
- uri.getAuthority() + ']');
-
- if (!F.eq(uri.getAuthority(), uriAuthority))
- throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
- uri.getAuthority() + ']');
- }
- }
-
- /**
- * Public setter that can be used by direct users of FS or Visor.
- *
- * @param colocateFileWrites Whether all ongoing file writes should be colocated.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void colocateFileWrites(boolean colocateFileWrites) {
- this.colocateFileWrites = colocateFileWrites;
- }
-
- /**
- * Enter busy state.
- *
- * @throws IOException If file system is stopped.
- */
- private void enterBusy() throws IOException {
- if (closeGuard.get())
- throw new IOException("File system is stopped.");
- }
-
- /**
- * Leave busy state.
- */
- private void leaveBusy() {
- // No-op.
- }
-
- /**
- * @param name URI passed to constructor.
- * @param cfg Configuration passed to constructor.
- * @throws IOException If initialization failed.
- */
- @SuppressWarnings("ConstantConditions")
- private void initialize(URI name, Configuration cfg) throws IOException {
- enterBusy();
-
- try {
- if (rmtClient != null)
- throw new IOException("File system is already initialized: " + rmtClient);
-
- A.notNull(name, "name");
- A.notNull(cfg, "cfg");
-
- if (!IGFS_SCHEME.equals(name.getScheme()))
- throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
- "://[name]/[optional_path], actual=" + name + ']');
-
- uriAuthority = name.getAuthority();
-
- // Override sequential reads before prefetch if needed.
- seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
-
- if (seqReadsBeforePrefetch > 0)
- seqReadsBeforePrefetchOverride = true;
-
- // In Ignite replication factor is controlled by data cache affinity.
- // We use replication factor to force the whole file to be stored on local node.
- dfltReplication = (short)cfg.getInt("dfs.replication", 3);
-
- // Get file colocation control flag.
- colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
- preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
-
- // Get log directory.
- String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
-
- File logDirFile = U.resolveIgnitePath(logDirCfg);
-
- String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
-
- rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
-
- // Handshake.
- IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
-
- grpBlockSize = handshake.blockSize();
-
- IgfsPaths paths = handshake.secondaryPaths();
-
- Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
-
- if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
- // Initiate client logger.
- if (logDir == null)
- throw new IOException("Failed to resolve log directory: " + logDirCfg);
-
- Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
-
- clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
- }
- else
- clientLog = IgfsLogger.disabledLogger();
-
- try {
- modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
- }
- catch (IgniteCheckedException ice) {
- throw new IOException(ice);
- }
-
- boolean initSecondary = paths.defaultMode() == PROXY;
-
- if (!initSecondary && paths.pathModes() != null) {
- for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
- IgfsMode mode = pathMode.getValue();
-
- if (mode == PROXY) {
- initSecondary = true;
-
- break;
- }
- }
- }
-
- if (initSecondary) {
- try {
- factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to get secondary file system factory.", e);
- }
-
- if (factory == null)
- throw new IOException("Failed to get secondary file system factory (did you set " +
- IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
- FileSystemConfiguration.class.getName() + "?)");
-
- assert factory != null;
-
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).start();
-
- try {
- FileSystem secFs = factory.get(user);
-
- secondaryUri = secFs.getUri();
-
- A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
- }
- catch (IOException e) {
- throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
- }
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- if (rmtClient == null)
- return;
-
- rmtClient.close(false);
-
- if (clientLog.isLogEnabled())
- clientLog.close();
-
- if (factory instanceof LifecycleAware)
- ((LifecycleAware) factory).stop();
-
- // Reset initialized resources.
- rmtClient = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public URI getUri() {
- return uri;
- }
-
- /** {@inheritDoc} */
- @Override public int getUriDefaultPort() {
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public FsServerDefaults getServerDefaults() throws IOException {
- return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024,
- false, 0, DataChecksum.Type.NULL);
- }
-
- /** {@inheritDoc} */
- @Override public boolean setReplication(Path f, short replication) throws IOException {
- return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
- }
-
- /** {@inheritDoc} */
- @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
- if (mode(f) == PROXY)
- secondaryFileSystem().setTimes(f, mtime, atime);
- else {
- if (mtime == -1 && atime == -1)
- return;
-
- rmtClient.setTimes(convert(f), atime, mtime);
- }
- }
-
- /** {@inheritDoc} */
- @Override public FsStatus getFsStatus() throws IOException {
- IgfsStatus status = rmtClient.fsStatus();
-
- return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed());
- }
-
- /** {@inheritDoc} */
- @Override public void setPermission(Path p, FsPermission perm) throws IOException {
- enterBusy();
-
- try {
- A.notNull(p, "p");
-
- if (mode(p) == PROXY)
- secondaryFileSystem().setPermission(toSecondary(p), perm);
- else {
- if (rmtClient.update(convert(p), permission(perm)) == null)
- throw new IOException("Failed to set file permission (file not found?)" +
- " [path=" + p + ", perm=" + perm + ']');
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void setOwner(Path p, String usr, String grp) throws IOException {
- A.notNull(p, "p");
- A.notNull(usr, "username");
- A.notNull(grp, "grpName");
-
- enterBusy();
-
- try {
- if (mode(p) == PROXY)
- secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
- else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
- IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
- throw new IOException("Failed to set file permission (file not found?)" +
- " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- try {
- IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
-
- if (clientLog.isLogEnabled()) {
- // At this point we do not know file size, so we perform additional request to remote FS to get it.
- FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
-
- long size = status != null ? status.getLen() : -1;
-
- long logId = IgfsLogger.nextId();
-
- clientLog.logOpen(logId, path, PROXY, bufSize, size);
-
- return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId));
- }
- else
- return is;
- }
- else {
- HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ?
- rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
-
- long logId = -1;
-
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
-
- clientLog.logOpen(logId, path, mode, bufSize, stream.length());
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
- ", bufSize=" + bufSize + ']');
-
- HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(),
- bufSize, LOG, clientLog, logId);
-
- if (LOG.isDebugEnabled())
- LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
-
- return new FSDataInputStream(igfsIn);
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override public FSDataOutputStream createInternal(
- Path f,
- EnumSet<CreateFlag> flag,
- FsPermission perm,
- int bufSize,
- short replication,
- long blockSize,
- Progressable progress,
- Options.ChecksumOpt checksumOpt,
- boolean createParent
- ) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
- boolean append = flag.contains(CreateFlag.APPEND);
- boolean create = flag.contains(CreateFlag.CREATE);
-
- OutputStream out = null;
-
- try {
- IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (LOG.isDebugEnabled())
- LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
- path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
-
- if (mode == PROXY) {
- FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
- replication, blockSize, progress);
-
- if (clientLog.isLogEnabled()) {
- long logId = IgfsLogger.nextId();
-
- if (append)
- clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
- else
- clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
-
- return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId));
- }
- else
- return os;
- }
- else {
- Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
- IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
-
- // Create stream and close it in the 'finally' section if any sequential operation failed.
- HadoopIgfsStreamDelegate stream;
-
- long logId = -1;
-
- if (append) {
- stream = rmtClient.append(path, create, permMap);
-
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
-
- clientLog.logAppend(logId, path, mode, bufSize);
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
- }
- else {
- stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
- permMap);
-
- if (clientLog.isLogEnabled()) {
- logId = IgfsLogger.nextId();
-
- clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
- }
-
- assert stream != null;
-
- HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG,
- clientLog, logId);
-
- bufSize = Math.max(64 * 1024, bufSize);
-
- out = new BufferedOutputStream(igfsOut, bufSize);
-
- FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
-
- // Mark stream created successfully.
- out = null;
-
- return res;
- }
- }
- finally {
- // Close if failed during stream creation.
- if (out != null)
- U.closeQuiet(out);
-
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean supportsSymlinks() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void renameInternal(Path src, Path dst) throws IOException {
- A.notNull(src, "src");
- A.notNull(dst, "dst");
-
- enterBusy();
-
- try {
- IgfsPath srcPath = convert(src);
- IgfsPath dstPath = convert(dst);
-
- IgfsMode srcMode = modeRslvr.resolveMode(srcPath);
-
- if (clientLog.isLogEnabled())
- clientLog.logRename(srcPath, srcMode, dstPath);
-
- if (srcMode == PROXY)
- secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
- else
- rmtClient.rename(srcPath, dstPath);
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(Path f, boolean recursive) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- try {
- IgfsPath path = convert(f);
-
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- if (clientLog.isLogEnabled())
- clientLog.logDelete(path, PROXY, recursive);
-
- return secondaryFileSystem().delete(toSecondary(f), recursive);
- }
-
- boolean res = rmtClient.delete(path, recursive);
-
- if (clientLog.isLogEnabled())
- clientLog.logDelete(path, mode, recursive);
-
- return res;
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
- // Checksum has effect for secondary FS only.
- if (factory != null)
- secondaryFileSystem().setVerifyChecksum(verifyChecksum);
- }
-
- /** {@inheritDoc} */
- @Override public FileChecksum getFileChecksum(Path f) throws IOException {
- if (mode(f) == PROXY)
- return secondaryFileSystem().getFileChecksum(f);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus[] listStatus(Path f) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- try {
- IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
-
- if (arr == null)
- throw new FileNotFoundException("File " + f + " does not exist.");
-
- for (int i = 0; i < arr.length; i++)
- arr[i] = toPrimary(arr[i]);
-
- if (clientLog.isLogEnabled()) {
- String[] fileArr = new String[arr.length];
-
- for (int i = 0; i < arr.length; i++)
- fileArr[i] = arr[i].getPath().toString();
-
- clientLog.logListDirectory(path, PROXY, fileArr);
- }
-
- return arr;
- }
- else {
- Collection<IgfsFile> list = rmtClient.listFiles(path);
-
- if (list == null)
- throw new FileNotFoundException("File " + f + " does not exist.");
-
- List<IgfsFile> files = new ArrayList<>(list);
-
- FileStatus[] arr = new FileStatus[files.size()];
-
- for (int i = 0; i < arr.length; i++)
- arr[i] = convert(files.get(i));
-
- if (clientLog.isLogEnabled()) {
- String[] fileArr = new String[arr.length];
-
- for (int i = 0; i < arr.length; i++)
- fileArr[i] = arr[i].getPath().toString();
-
- clientLog.logListDirectory(path, mode, fileArr);
- }
-
- return arr;
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- try {
- IgfsPath path = convert(f);
- IgfsMode mode = modeRslvr.resolveMode(path);
-
- if (mode == PROXY) {
- if (clientLog.isLogEnabled())
- clientLog.logMakeDirectory(path, PROXY);
-
- secondaryFileSystem().mkdirs(toSecondary(f), perm);
- }
- else {
- rmtClient.mkdirs(path, permission(perm));
-
- if (clientLog.isLogEnabled())
- clientLog.logMakeDirectory(path, mode);
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus getFileStatus(Path f) throws IOException {
- A.notNull(f, "f");
-
- enterBusy();
-
- try {
- if (mode(f) == PROXY)
- return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
- else {
- IgfsFile info = rmtClient.info(convert(f));
-
- if (info == null)
- throw new FileNotFoundException("File not found: " + f);
-
- return convert(info);
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
- A.notNull(path, "path");
-
- IgfsPath igfsPath = convert(path);
-
- enterBusy();
-
- try {
- if (modeRslvr.resolveMode(igfsPath) == PROXY)
- return secondaryFileSystem().getFileBlockLocations(path, start, len);
- else {
- long now = System.currentTimeMillis();
-
- List<IgfsBlockLocation> affinity = new ArrayList<>(
- rmtClient.affinity(igfsPath, start, len));
-
- BlockLocation[] arr = new BlockLocation[affinity.size()];
-
- for (int i = 0; i < arr.length; i++)
- arr[i] = convert(affinity.get(i));
-
- if (LOG.isDebugEnabled())
- LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
- (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
-
- return arr;
- }
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Resolve path mode.
- *
- * @param path HDFS path.
- * @return Path mode.
- */
- public IgfsMode mode(Path path) {
- return modeRslvr.resolveMode(convert(path));
- }
-
- /**
- * Convert the given path to path acceptable by the primary file system.
- *
- * @param path Path.
- * @return Primary file system path.
- */
- private Path toPrimary(Path path) {
- return convertPath(path, getUri());
- }
-
- /**
- * Convert the given path to path acceptable by the secondary file system.
- *
- * @param path Path.
- * @return Secondary file system path.
- */
- private Path toSecondary(Path path) {
- assert factory != null;
- assert secondaryUri != null;
-
- return convertPath(path, secondaryUri);
- }
-
- /**
- * Convert path using the given new URI.
- *
- * @param path Old path.
- * @param newUri New URI.
- * @return New path.
- */
- private Path convertPath(Path path, URI newUri) {
- assert newUri != null;
-
- if (path != null) {
- URI pathUri = path.toUri();
-
- try {
- return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
- pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
- }
- catch (URISyntaxException e) {
- throw new IgniteException("Failed to construct secondary file system path from the primary file " +
- "system path: " + path, e);
- }
- }
- else
- return null;
- }
-
- /**
- * Convert a file status obtained from the secondary file system to a status of the primary file system.
- *
- * @param status Secondary file system status.
- * @return Primary file system status.
- */
- private FileStatus toPrimary(FileStatus status) {
- return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
- status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
- status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
- }
-
- /**
- * Convert IGFS path into Hadoop path.
- *
- * @param path IGFS path.
- * @return Hadoop path.
- */
- private Path convert(IgfsPath path) {
- return new Path(IGFS_SCHEME, uriAuthority, path.toString());
- }
-
- /**
- * Convert Hadoop path into IGFS path.
- *
- * @param path Hadoop path.
- * @return IGFS path.
- */
- @Nullable private IgfsPath convert(Path path) {
- if (path == null)
- return null;
-
- return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
- new IgfsPath(workingDir, path.toUri().getPath());
- }
-
- /**
- * Convert IGFS affinity block location into Hadoop affinity block location.
- *
- * @param block IGFS affinity block location.
- * @return Hadoop affinity block location.
- */
- private BlockLocation convert(IgfsBlockLocation block) {
- Collection<String> names = block.names();
- Collection<String> hosts = block.hosts();
-
- return new BlockLocation(
- names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
- hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
- block.start(), block.length()
- ) {
- @Override public String toString() {
- try {
- return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
- ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- /**
- * Convert IGFS file information into Hadoop file status.
- *
- * @param file IGFS file information.
- * @return Hadoop file status.
- */
- private FileStatus convert(IgfsFile file) {
- return new FileStatus(
- file.length(),
- file.isDirectory(),
- dfltReplication,
- file.groupBlockSize(),
- file.modificationTime(),
- file.accessTime(),
- permission(file),
- file.property(IgfsUtils.PROP_USER_NAME, user),
- file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
- convert(file.path())) {
- @Override public String toString() {
- return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
- }
- };
- }
-
- /**
- * Convert Hadoop permission into IGFS file attribute.
- *
- * @param perm Hadoop permission.
- * @return IGFS attributes.
- */
- private Map<String, String> permission(FsPermission perm) {
- if (perm == null)
- perm = FsPermission.getDefault();
-
- return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
- }
-
- /**
- * @param perm Permission.
- * @return String.
- */
- private static String toString(FsPermission perm) {
- return String.format("%04o", perm.toShort());
- }
-
- /**
- * Convert IGFS file attributes into Hadoop permission.
- *
- * @param file File info.
- * @return Hadoop permission.
- */
- private FsPermission permission(IgfsFile file) {
- String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
-
- if (perm == null)
- return FsPermission.getDefault();
-
- try {
- return new FsPermission((short)Integer.parseInt(perm, 8));
- }
- catch (NumberFormatException ignore) {
- return FsPermission.getDefault();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteHadoopFileSystem.class, this);
- }
-
- /**
- * Returns the user name this File System is created on behalf of.
- * @return the user name
- */
- public String user() {
- return user;
- }
-
- /**
- * Gets cached or creates a {@link FileSystem}.
- *
- * @return The secondary file system.
- */
- private FileSystem secondaryFileSystem() throws IOException{
- assert factory != null;
-
- return factory.get(user);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
deleted file mode 100644
index d8e70d1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation.
- */
-package org.apache.ignite.hadoop.fs.v2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
deleted file mode 100644
index 583af35..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientFactory;
-import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
-import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-
-import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
-
-
-/**
- * Ignite Hadoop client protocol provider.
- */
-public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
- /** Framework name used in configuration. */
- public static final String FRAMEWORK_NAME = "ignite";
-
- /** Clients. */
- private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
-
- /** {@inheritDoc} */
- @Override public ClientProtocol create(Configuration conf) throws IOException {
- if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
- String addr = conf.get(MRConfig.MASTER_ADDRESS);
-
- if (F.isEmpty(addr))
- throw new IOException("Failed to create client protocol because server address is not specified (is " +
- MRConfig.MASTER_ADDRESS + " property set?).");
-
- if (F.eq(addr, "local"))
- throw new IOException("Local execution mode is not supported, please point " +
- MRConfig.MASTER_ADDRESS + " to real Ignite node.");
-
- return createProtocol(addr, conf);
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
- if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
- return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void close(ClientProtocol cliProto) throws IOException {
- // No-op.
- }
-
- /**
- * Internal protocol creation routine.
- *
- * @param addr Address.
- * @param conf Configuration.
- * @return Client protocol.
- * @throws IOException If failed.
- */
- private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
- return new HadoopClientProtocol(conf, client(addr));
- }
-
- /**
- * Create client.
- *
- * @param addr Endpoint address.
- * @return Client.
- * @throws IOException If failed.
- */
- private static GridClient client(String addr) throws IOException {
- try {
- IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
-
- if (fut == null) {
- GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
- IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0);
-
- if (oldFut != null)
- return oldFut.get();
- else {
- GridClientConfiguration cliCfg = new GridClientConfiguration();
-
- cliCfg.setProtocol(TCP);
- cliCfg.setServers(Collections.singletonList(addr));
- cliCfg.setMarshaller(new GridClientJdkMarshaller());
- cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
- cliCfg.setDaemon(true);
-
- try {
- GridClient cli = GridClientFactory.start(cliCfg);
-
- fut0.onDone(cli);
-
- return cli;
- }
- catch (GridClientException e) {
- fut0.onDone(e);
-
- throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
- }
- }
- }
- else
- return fut.get();
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index d4a44fa..e1101c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -17,16 +17,6 @@
package org.apache.ignite.hadoop.mapreduce;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -38,13 +28,23 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+
import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 27ffc19..2d1ac0b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -24,11 +24,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
@@ -116,7 +116,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
/** {@inheritDoc} */
@Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
@Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
- List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+ List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
int reducerCnt = job.info().reducers();
if (reducerCnt < 0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
deleted file mode 100644
index 7635b9e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Ignite Hadoop Accelerator map-reduce classes.
- */
-package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
index 26dc4b2..12669aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
@@ -17,14 +17,12 @@
package org.apache.ignite.hadoop.util;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.jetbrains.annotations.Nullable;
import java.io.Serializable;
/**
- * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} implementation to pass proper user names
- * to the underlying Hadoop file system.
+ * Hadoop file system name mapper. Ensures that correct user name is passed to the underlying Hadoop file system.
*/
public interface UserNameMapper extends Serializable {
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
deleted file mode 100644
index 23eaa18..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
-/**
- * Hadoop attributes.
- */
-public class HadoopAttributes implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Attribute name. */
- public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop";
-
- /** Map-reduce planner class name. */
- private String plannerCls;
-
- /** External executor flag. */
- private boolean extExec;
-
- /** Maximum parallel tasks. */
- private int maxParallelTasks;
-
- /** Maximum task queue size. */
- private int maxTaskQueueSize;
-
- /** Library names. */
- @GridToStringExclude
- private String[] libNames;
-
- /** Number of cores. */
- private int cores;
-
- /**
- * Get attributes for node (if any).
- *
- * @param node Node.
- * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node.
- */
- @Nullable public static HadoopAttributes forNode(ClusterNode node) {
- return node.attribute(NAME);
- }
-
- /**
- * {@link Externalizable} support.
- */
- public HadoopAttributes() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cfg Configuration.
- */
- public HadoopAttributes(HadoopConfiguration cfg) {
- assert cfg != null;
- assert cfg.getMapReducePlanner() != null;
-
- plannerCls = cfg.getMapReducePlanner().getClass().getName();
-
- // TODO: IGNITE-404: Get from configuration when fixed.
- extExec = false;
-
- maxParallelTasks = cfg.getMaxParallelTasks();
- maxTaskQueueSize = cfg.getMaxTaskQueueSize();
- libNames = cfg.getNativeLibraryNames();
-
- // Cores count already passed in other attributes, we add it here for convenience.
- cores = Runtime.getRuntime().availableProcessors();
- }
-
- /**
- * @return Map reduce planner class name.
- */
- public String plannerClassName() {
- return plannerCls;
- }
-
- /**
- * @return External execution flag.
- */
- public boolean externalExecution() {
- return extExec;
- }
-
- /**
- * @return Maximum parallel tasks.
- */
- public int maxParallelTasks() {
- return maxParallelTasks;
- }
-
- /**
- * @return Maximum task queue size.
- */
- public int maxTaskQueueSize() {
- return maxTaskQueueSize;
- }
-
-
- /**
- * @return Native library names.
- */
- public String[] nativeLibraryNames() {
- return libNames;
- }
-
- /**
- * @return Number of cores on machine.
- */
- public int cores() {
- return cores;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(plannerCls);
- out.writeBoolean(extExec);
- out.writeInt(maxParallelTasks);
- out.writeInt(maxTaskQueueSize);
- out.writeObject(libNames);
- out.writeInt(cores);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- plannerCls = (String)in.readObject();
- extExec = in.readBoolean();
- maxParallelTasks = in.readInt();
- maxTaskQueueSize = in.readInt();
- libNames = (String[])in.readObject();
- cores = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames));
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
new file mode 100644
index 0000000..83f94ce
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+/**
+ * Common Hadoop utility methods which do not depend on Hadoop API.
+ */
+public class HadoopCommonUtils {
+ /**
+ * Sort input splits by length.
+ *
+ * @param splits Splits.
+ * @return Sorted splits.
+ */
+ public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
+ int id = 0;
+
+ TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+ for (HadoopInputSplit split : splits) {
+ long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
+
+ sortedSplits.add(new SplitSortWrapper(id++, split, len));
+ }
+
+ ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+ for (SplitSortWrapper sortedSplit : sortedSplits)
+ res.add(sortedSplit.split);
+
+ return res;
+ }
+
+ /**
+ * Split wrapper for sorting.
+ */
+ private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+ /** Unique ID. */
+ private final int id;
+
+ /** Split. */
+ private final HadoopInputSplit split;
+
+ /** Split length. */
+ private final long len;
+
+ /**
+ * Constructor.
+ *
+ * @param id Unique ID.
+ * @param split Split.
+ * @param len Split length.
+ */
+ public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+ this.id = id;
+ this.split = split;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public int compareTo(SplitSortWrapper other) {
+ long res = len - other.len;
+
+ if (res > 0)
+ return -1;
+ else if (res < 0)
+ return 1;
+ else
+ return id - other.id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+ }
+ }
+
+ /**
+ * Private constructor.
+ */
+ private HadoopCommonUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
deleted file mode 100644
index aeda5c0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-
-/**
- * Abstract class for all hadoop components.
- */
-public abstract class HadoopComponent {
- /** Hadoop context. */
- protected HadoopContext ctx;
-
- /** Logger. */
- protected IgniteLogger log;
-
- /**
- * @param ctx Hadoop context.
- */
- public void start(HadoopContext ctx) throws IgniteCheckedException {
- this.ctx = ctx;
-
- log = ctx.kernalContext().log(getClass());
- }
-
- /**
- * Stops manager.
- */
- public void stop(boolean cancel) {
- // No-op.
- }
-
- /**
- * Callback invoked when all grid components are started.
- */
- public void onKernalStart() throws IgniteCheckedException {
- // No-op.
- }
-
- /**
- * Callback invoked before all grid components are stopped.
- */
- public void onKernalStop(boolean cancel) {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
deleted file mode 100644
index 42a3d72..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-/**
- * Hadoop accelerator context.
- */
-public class HadoopContext {
- /** Kernal context. */
- private GridKernalContext ctx;
-
- /** Hadoop configuration. */
- private HadoopConfiguration cfg;
-
- /** Job tracker. */
- private HadoopJobTracker jobTracker;
-
- /** External task executor. */
- private HadoopTaskExecutorAdapter taskExecutor;
-
- /** */
- private HadoopShuffle shuffle;
-
- /** Managers list. */
- private List<HadoopComponent> components = new ArrayList<>();
-
- /**
- * @param ctx Kernal context.
- */
- public HadoopContext(
- GridKernalContext ctx,
- HadoopConfiguration cfg,
- HadoopJobTracker jobTracker,
- HadoopTaskExecutorAdapter taskExecutor,
- HadoopShuffle shuffle
- ) {
- this.ctx = ctx;
- this.cfg = cfg;
-
- this.jobTracker = add(jobTracker);
- this.taskExecutor = add(taskExecutor);
- this.shuffle = add(shuffle);
- }
-
- /**
- * Gets list of managers.
- *
- * @return List of managers.
- */
- public List<HadoopComponent> components() {
- return components;
- }
-
- /**
- * Gets kernal context.
- *
- * @return Grid kernal context instance.
- */
- public GridKernalContext kernalContext() {
- return ctx;
- }
-
- /**
- * Gets Hadoop configuration.
- *
- * @return Hadoop configuration.
- */
- public HadoopConfiguration configuration() {
- return cfg;
- }
-
- /**
- * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
- *
- * @return Local node ID.
- */
- public UUID localNodeId() {
- return ctx.localNodeId();
- }
-
- /**
- * Gets local node order.
- *
- * @return Local node order.
- */
- public long localNodeOrder() {
- assert ctx.discovery() != null;
-
- return ctx.discovery().localNode().order();
- }
-
- /**
- * @return Hadoop-enabled nodes.
- */
- public Collection<ClusterNode> nodes() {
- return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx());
- }
-
- /**
- * @return {@code True} if
- */
- public boolean jobUpdateLeader() {
- long minOrder = Long.MAX_VALUE;
- ClusterNode minOrderNode = null;
-
- for (ClusterNode node : nodes()) {
- if (node.order() < minOrder) {
- minOrder = node.order();
- minOrderNode = node;
- }
- }
-
- assert minOrderNode != null;
-
- return localNodeId().equals(minOrderNode.id());
- }
-
- /**
- * @param meta Job metadata.
- * @return {@code true} If local node is participating in job execution.
- */
- public boolean isParticipating(HadoopJobMetadata meta) {
- UUID locNodeId = localNodeId();
-
- if (locNodeId.equals(meta.submitNodeId()))
- return true;
-
- HadoopMapReducePlan plan = meta.mapReducePlan();
-
- return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
- }
-
- /**
- * @return Jon tracker instance.
- */
- public HadoopJobTracker jobTracker() {
- return jobTracker;
- }
-
- /**
- * @return Task executor.
- */
- public HadoopTaskExecutorAdapter taskExecutor() {
- return taskExecutor;
- }
-
- /**
- * @return Shuffle.
- */
- public HadoopShuffle shuffle() {
- return shuffle;
- }
-
- /**
- * @return Map-reduce planner.
- */
- public HadoopMapReducePlanner planner() {
- return cfg.getMapReducePlanner();
- }
-
- /**
- * Adds component.
- *
- * @param c Component to add.
- * @return Added manager.
- */
- private <C extends HadoopComponent> C add(C c) {
- components.add(c);
-
- return c;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
deleted file mode 100644
index ae17ac8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop job info based on default Hadoop configuration.
- */
-public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
- /** */
- private static final long serialVersionUID = 5489900236464999951L;
-
- /** {@code true} If job has combiner. */
- private boolean hasCombiner;
-
- /** Number of reducers configured for job. */
- private int numReduces;
-
- /** Configuration. */
- private Map<String,String> props = new HashMap<>();
-
- /** Job name. */
- private String jobName;
-
- /** User name. */
- private String user;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopDefaultJobInfo() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param jobName Job name.
- * @param user User name.
- * @param hasCombiner {@code true} If job has combiner.
- * @param numReduces Number of reducers configured for job.
- * @param props All other properties of the job.
- */
- public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
- Map<String, String> props) {
- this.jobName = jobName;
- this.user = user;
- this.hasCombiner = hasCombiner;
- this.numReduces = numReduces;
- this.props = props;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- return props.get(name);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
- @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
- assert jobCls != null;
-
- try {
- Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
- HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class);
-
- return constructor.newInstance(jobId, this, log, libNames, helper);
- }
- catch (Throwable t) {
- if (t instanceof Error)
- throw (Error)t;
-
- throw new IgniteCheckedException(t);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- return hasCombiner;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- return reducers() > 0;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return numReduces;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- return jobName;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- return user;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, jobName);
- U.writeString(out, user);
-
- out.writeBoolean(hasCombiner);
- out.writeInt(numReduces);
-
- U.writeStringMap(out, props);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobName = U.readString(in);
- user = U.readString(in);
-
- hasCombiner = in.readBoolean();
- numReduces = in.readInt();
-
- props = U.readStringMap(in);
- }
-
- /**
- * @return Properties of the job.
- */
- public Map<String, String> properties() {
- return props;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
deleted file mode 100644
index ed2657e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop facade implementation.
- */
-public class HadoopImpl implements Hadoop {
- /** Hadoop processor. */
- private final HadoopProcessor proc;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /**
- * Constructor.
- *
- * @param proc Hadoop processor.
- */
- HadoopImpl(HadoopProcessor proc) {
- this.proc = proc;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration configuration() {
- return proc.config();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId nextJobId() {
- if (busyLock.enterBusy()) {
- try {
- return proc.nextJobId();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
- if (busyLock.enterBusy()) {
- try {
- return proc.submit(jobId, jobInfo);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to submit job (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.status(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job status (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.counters(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job counters (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.finishFuture(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.kill(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to kill job (grid is stopping).");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
deleted file mode 100644
index 4e03e17..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
-
-/**
- * Hadoop +counter group adapter.
- */
-class HadoopMapReduceCounterGroup implements CounterGroup {
- /** Counters. */
- private final HadoopMapReduceCounters cntrs;
-
- /** Group name. */
- private final String name;
-
- /**
- * Creates new instance.
- *
- * @param cntrs Client counters instance.
- * @param name Group name.
- */
- HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
- this.cntrs = cntrs;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public String getDisplayName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public void setDisplayName(String displayName) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void addCounter(Counter counter) {
- addCounter(counter.getName(), counter.getDisplayName(), 0);
- }
-
- /** {@inheritDoc} */
- @Override public Counter addCounter(String name, String displayName, long value) {
- final Counter counter = cntrs.findCounter(this.name, name);
-
- counter.setValue(value);
-
- return counter;
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, String displayName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, boolean create) {
- return cntrs.findCounter(name, counterName, create);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return cntrs.groupSize(name);
- }
-
- /** {@inheritDoc} */
- @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
- for (final Counter counter : rightGroup)
- cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Counter> iterator() {
- return cntrs.iterateGroup(name);
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-}
\ No newline at end of file