You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/09/26 12:13:56 UTC
[42/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class
loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
deleted file mode 100644
index 9902142..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsException;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
-import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
-import org.apache.ignite.internal.igfs.common.IgfsMessage;
-import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
-import org.apache.ignite.internal.igfs.common.IgfsStatusRequest;
-import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.lang.IgniteClosure;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK;
-
-/**
- * Communication with external process (TCP or shmem).
- */
-public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
- /** Expected result is boolean. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
-
- /** Expected result is boolean. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure();
-
- /** Expected result is {@code IgfsFile}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
-
- /** Expected result is {@code IgfsHandshakeResponse} */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
- IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
-
- /** Expected result is {@code IgfsStatus} */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
- createClosure();
-
- /** Expected result is {@code IgfsFile}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
- IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
-
- /** Expected result is {@code IgfsFile}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
- Collection<IgfsFile>> FILE_COL_RES = createClosure();
-
- /** Expected result is {@code IgfsFile}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
- Collection<IgfsPath>> PATH_COL_RES = createClosure();
-
- /** Expected result is {@code IgfsPathSummary}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
- createClosure();
-
- /** Expected result is {@code IgfsFile}. */
- private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
- Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
-
- /** Grid name. */
- private final String grid;
-
- /** IGFS name. */
- private final String igfs;
-
- /** The user this out proc is performing on behalf of. */
- private final String userName;
-
- /** Client log. */
- private final Log log;
-
- /** Client IO. */
- private final HadoopIgfsIpcIo io;
-
- /** Event listeners. */
- private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
-
- /**
- * Constructor for TCP endpoint.
- *
- * @param host Host.
- * @param port Port.
- * @param grid Grid name.
- * @param igfs IGFS name.
- * @param log Client logger.
- * @throws IOException If failed.
- */
- public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
- this(host, port, grid, igfs, false, log, user);
- }
-
- /**
- * Constructor for shmem endpoint.
- *
- * @param port Port.
- * @param grid Grid name.
- * @param igfs IGFS name.
- * @param log Client logger.
- * @throws IOException If failed.
- */
- public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
- this(null, port, grid, igfs, true, log, user);
- }
-
- /**
- * Constructor.
- *
- * @param host Host.
- * @param port Port.
- * @param grid Grid name.
- * @param igfs IGFS name.
- * @param shmem Shared memory flag.
- * @param log Client logger.
- * @throws IOException If failed.
- */
- private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
- throws IOException {
- assert host != null && !shmem || host == null && shmem :
- "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
-
- String endpoint = host != null ? host + ":" + port : "shmem:" + port;
-
- this.grid = grid;
- this.igfs = igfs;
- this.log = log;
- this.userName = IgfsUtils.fixUserName(user);
-
- io = HadoopIgfsIpcIo.get(log, endpoint);
-
- io.addEventListener(this);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
- final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
-
- req.gridName(grid);
- req.igfsName(igfs);
- req.logDirectory(logDir);
-
- return io.send(req).chain(HANDSHAKE_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public void close(boolean force) {
- assert io != null;
-
- io.removeEventListener(this);
-
- if (force)
- io.forceClose();
- else
- io.release();
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(INFO);
- msg.path(path);
- msg.userName(userName);
-
- return io.send(msg).chain(FILE_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(UPDATE);
- msg.path(path);
- msg.properties(props);
- msg.userName(userName);
-
- return io.send(msg).chain(FILE_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(SET_TIMES);
- msg.path(path);
- msg.accessTime(accessTime);
- msg.modificationTime(modificationTime);
- msg.userName(userName);
-
- return io.send(msg).chain(BOOL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(RENAME);
- msg.path(src);
- msg.destinationPath(dest);
- msg.userName(userName);
-
- return io.send(msg).chain(BOOL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(DELETE);
- msg.path(path);
- msg.flag(recursive);
- msg.userName(userName);
-
- return io.send(msg).chain(BOOL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
- throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(AFFINITY);
- msg.path(path);
- msg.start(start);
- msg.length(len);
- msg.userName(userName);
-
- return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(PATH_SUMMARY);
- msg.path(path);
- msg.userName(userName);
-
- return io.send(msg).chain(SUMMARY_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(MAKE_DIRECTORIES);
- msg.path(path);
- msg.properties(props);
- msg.userName(userName);
-
- return io.send(msg).chain(BOOL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(LIST_FILES);
- msg.path(path);
- msg.userName(userName);
-
- return io.send(msg).chain(FILE_COL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(LIST_PATHS);
- msg.path(path);
- msg.userName(userName);
-
- return io.send(msg).chain(PATH_COL_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
- return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(OPEN_READ);
- msg.path(path);
- msg.flag(false);
- msg.userName(userName);
-
- IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
- return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
- int seqReadsBeforePrefetch) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(OPEN_READ);
- msg.path(path);
- msg.flag(true);
- msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
- msg.userName(userName);
-
- IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
- return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
- int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(OPEN_CREATE);
- msg.path(path);
- msg.flag(overwrite);
- msg.colocate(colocate);
- msg.properties(props);
- msg.replication(replication);
- msg.blockSize(blockSize);
- msg.userName(userName);
-
- Long streamId = io.send(msg).chain(LONG_RES).get();
-
- return new HadoopIgfsStreamDelegate(this, streamId);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
- @Nullable Map<String, String> props) throws IgniteCheckedException {
- final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
- msg.command(OPEN_APPEND);
- msg.path(path);
- msg.flag(create);
- msg.properties(props);
- msg.userName(userName);
-
- Long streamId = io.send(msg).chain(LONG_RES).get();
-
- return new HadoopIgfsStreamDelegate(this, streamId);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
- final @Nullable byte[] outBuf, final int outOff, final int outLen) {
- assert len > 0;
-
- final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
- msg.command(READ_BLOCK);
- msg.streamId((long) desc.target());
- msg.position(pos);
- msg.length(len);
-
- try {
- return io.send(msg, outBuf, outOff, outLen);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
- throws IOException {
- final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
- msg.command(WRITE_BLOCK);
- msg.streamId((long) desc.target());
- msg.data(data);
- msg.position(off);
- msg.length(len);
-
- try {
- io.sendPlain(msg);
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
- final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
- msg.command(CLOSE);
- msg.streamId((long)desc.target());
-
- try {
- io.send(msg).chain(BOOL_RES).get();
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
- HadoopIgfsStreamEventListener lsnr) {
- long streamId = desc.target();
-
- HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
-
- assert lsnr0 == null || lsnr0 == lsnr;
-
- if (log.isDebugEnabled())
- log.debug("Added stream event listener [streamId=" + streamId + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
- long streamId = desc.target();
-
- HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
-
- if (lsnr0 != null && log.isDebugEnabled())
- log.debug("Removed stream event listener [streamId=" + streamId + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void onClose() {
- for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
- try {
- lsnr.onClose();
- }
- catch (IgniteCheckedException e) {
- log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onError(long streamId, String errMsg) {
- HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
-
- if (lsnr != null)
- lsnr.onError(errMsg);
- else
- log.warn("Received write error response for not registered output stream (will ignore) " +
- "[streamId= " + streamId + ']');
- }
-
- /**
- * Creates conversion closure for given type.
- *
- * @param <T> Type of expected result.
- * @return Conversion closure.
- */
- @SuppressWarnings("unchecked")
- private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() {
- return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() {
- @Override public T apply(IgniteInternalFuture<IgfsMessage> fut) {
- try {
- IgfsControlResponse res = (IgfsControlResponse)fut.get();
-
- if (res.hasError())
- res.throwError();
-
- return (T)res.response();
- }
- catch (IgfsException | IgniteCheckedException e) {
- throw new GridClosureException(e);
- }
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- return userName;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
deleted file mode 100644
index 8f7458b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * IGFS Hadoop output stream implementation.
- */
-public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener {
- /** Log instance. */
- private Log log;
-
- /** Client logger. */
- private IgfsLogger clientLog;
-
- /** Log stream ID. */
- private long logStreamId;
-
- /** Server stream delegate. */
- private HadoopIgfsStreamDelegate delegate;
-
- /** Closed flag. */
- private volatile boolean closed;
-
- /** Flag set if stream was closed due to connection breakage. */
- private boolean connBroken;
-
- /** Error message. */
- private volatile String errMsg;
-
- /** Read time. */
- private long writeTime;
-
- /** User time. */
- private long userTime;
-
- /** Last timestamp. */
- private long lastTs;
-
- /** Amount of written bytes. */
- private long total;
-
- /**
- * Creates light output stream.
- *
- * @param delegate Server stream delegate.
- * @param log Logger to use.
- * @param clientLog Client logger.
- */
- public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log,
- IgfsLogger clientLog, long logStreamId) {
- this.delegate = delegate;
- this.log = log;
- this.clientLog = clientLog;
- this.logStreamId = logStreamId;
-
- lastTs = System.nanoTime();
-
- delegate.hadoop().addEventListener(delegate, this);
- }
-
- /**
- * Read start.
- */
- private void writeStart() {
- long now = System.nanoTime();
-
- userTime += now - lastTs;
-
- lastTs = now;
- }
-
- /**
- * Read end.
- */
- private void writeEnd() {
- long now = System.nanoTime();
-
- writeTime += now - lastTs;
-
- lastTs = now;
- }
-
- /** {@inheritDoc} */
- @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
- check();
-
- writeStart();
-
- try {
- delegate.hadoop().writeData(delegate, b, off, len);
-
- total += len;
- }
- finally {
- writeEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void write(int b) throws IOException {
- write(new byte[] {(byte)b});
-
- total++;
- }
-
- /** {@inheritDoc} */
- @Override public void flush() throws IOException {
- delegate.hadoop().flush(delegate);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- if (!closed) {
- if (log.isDebugEnabled())
- log.debug("Closing output stream: " + delegate);
-
- writeStart();
-
- delegate.hadoop().closeStream(delegate);
-
- markClosed(false);
-
- writeEnd();
-
- if (clientLog.isLogEnabled())
- clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
-
- if (log.isDebugEnabled())
- log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
- ", userTime=" + userTime / 1000 + ']');
- }
- else if(connBroken)
- throw new IOException(
- "Failed to close stream, because connection was broken (data could have been lost).");
- }
-
- /**
- * Marks stream as closed.
- *
- * @param connBroken {@code True} if connection with server was lost.
- */
- private void markClosed(boolean connBroken) {
- // It is ok to have race here.
- if (!closed) {
- closed = true;
-
- delegate.hadoop().removeEventListener(delegate);
-
- this.connBroken = connBroken;
- }
- }
-
- /**
- * @throws IOException If check failed.
- */
- private void check() throws IOException {
- String errMsg0 = errMsg;
-
- if (errMsg0 != null)
- throw new IOException(errMsg0);
-
- if (closed) {
- if (connBroken)
- throw new IOException("Server connection was lost.");
- else
- throw new IOException("Stream is closed.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onClose() throws IgniteCheckedException {
- markClosed(true);
- }
-
- /** {@inheritDoc} */
- @Override public void onError(String errMsg) {
- this.errMsg = errMsg;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
deleted file mode 100644
index 90f6bca..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.util.Map;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-
-/**
- * Hadoop file system properties.
- */
-public class HadoopIgfsProperties {
- /** Username. */
- private String usrName;
-
- /** Group name. */
- private String grpName;
-
- /** Permissions. */
- private FsPermission perm;
-
- /**
- * Constructor.
- *
- * @param props Properties.
- * @throws IgniteException In case of error.
- */
- public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
- usrName = props.get(IgfsUtils.PROP_USER_NAME);
- grpName = props.get(IgfsUtils.PROP_GROUP_NAME);
-
- String permStr = props.get(IgfsUtils.PROP_PERMISSION);
-
- if (permStr != null) {
- try {
- perm = new FsPermission((short)Integer.parseInt(permStr, 8));
- }
- catch (NumberFormatException ignore) {
- throw new IgniteException("Permissions cannot be parsed: " + permStr);
- }
- }
- }
-
- /**
- * Get user name.
- *
- * @return User name.
- */
- public String userName() {
- return usrName;
- }
-
- /**
- * Get group name.
- *
- * @return Group name.
- */
- public String groupName() {
- return grpName;
- }
-
- /**
- * Get permission.
- *
- * @return Permission.
- */
- public FsPermission permission() {
- return perm;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
deleted file mode 100644
index 5cee947..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-
-/**
- * Secondary Hadoop file system input stream wrapper.
- */
-public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
- /** Actual input stream to the secondary file system. */
- private final FSDataInputStream is;
-
- /** Client logger. */
- private final IgfsLogger clientLog;
-
- /** Log stream ID. */
- private final long logStreamId;
-
- /** Read time. */
- private long readTime;
-
- /** User time. */
- private long userTime;
-
- /** Last timestamp. */
- private long lastTs;
-
- /** Amount of read bytes. */
- private long total;
-
- /** Closed flag. */
- private boolean closed;
-
- /**
- * Constructor.
- *
- * @param is Actual input stream to the secondary file system.
- * @param clientLog Client log.
- */
- public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
- assert is != null;
- assert clientLog != null;
-
- this.is = is;
- this.clientLog = clientLog;
- this.logStreamId = logStreamId;
-
- lastTs = System.nanoTime();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(byte[] b) throws IOException {
- readStart();
-
- int res;
-
- try {
- res = is.read(b);
- }
- finally {
- readEnd();
- }
-
- if (res != -1)
- total += res;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
- readStart();
-
- int res;
-
- try {
- res = super.read(b, off, len);
- }
- finally {
- readEnd();
- }
-
- if (res != -1)
- total += res;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long skip(long n) throws IOException {
- readStart();
-
- long res;
-
- try {
- res = is.skip(n);
- }
- finally {
- readEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logSkip(logStreamId, res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int available() throws IOException {
- readStart();
-
- try {
- return is.available();
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- if (!closed) {
- closed = true;
-
- readStart();
-
- try {
- is.close();
- }
- finally {
- readEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logCloseIn(logStreamId, userTime, readTime, total);
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void mark(int readLimit) {
- readStart();
-
- try {
- is.mark(readLimit);
- }
- finally {
- readEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logMark(logStreamId, readLimit);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void reset() throws IOException {
- readStart();
-
- try {
- is.reset();
- }
- finally {
- readEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logReset(logStreamId);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized boolean markSupported() {
- readStart();
-
- try {
- return is.markSupported();
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read() throws IOException {
- readStart();
-
- int res;
-
- try {
- res = is.read();
- }
- finally {
- readEnd();
- }
-
- if (res != -1)
- total++;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
- readStart();
-
- int res;
-
- try {
- res = is.read(pos, buf, off, len);
- }
- finally {
- readEnd();
- }
-
- if (res != -1)
- total += res;
-
- if (clientLog.isLogEnabled())
- clientLog.logRandomRead(logStreamId, pos, res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
- readStart();
-
- try {
- is.readFully(pos, buf, off, len);
- }
- finally {
- readEnd();
- }
-
- total += len;
-
- if (clientLog.isLogEnabled())
- clientLog.logRandomRead(logStreamId, pos, len);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
- readStart();
-
- try {
- is.readFully(pos, buf);
- }
- finally {
- readEnd();
- }
-
- total += buf.length;
-
- if (clientLog.isLogEnabled())
- clientLog.logRandomRead(logStreamId, pos, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void seek(long pos) throws IOException {
- readStart();
-
- try {
- is.seek(pos);
- }
- finally {
- readEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logSeek(logStreamId, pos);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long getPos() throws IOException {
- readStart();
-
- try {
- return is.getPos();
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
- readStart();
-
- try {
- return is.seekToNewSource(targetPos);
- }
- finally {
- readEnd();
- }
- }
-
- /**
- * Read start.
- */
- private void readStart() {
- long now = System.nanoTime();
-
- userTime += now - lastTs;
-
- lastTs = now;
- }
-
- /**
- * Read end.
- */
- private void readEnd() {
- long now = System.nanoTime();
-
- readTime += now - lastTs;
-
- lastTs = now;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
deleted file mode 100644
index eade0f0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-
-/**
- * Secondary Hadoop file system output stream wrapper.
- */
-public class HadoopIgfsProxyOutputStream extends OutputStream {
- /** Actual output stream. */
- private FSDataOutputStream os;
-
- /** Client logger. */
- private final IgfsLogger clientLog;
-
- /** Log stream ID. */
- private final long logStreamId;
-
- /** Read time. */
- private long writeTime;
-
- /** User time. */
- private long userTime;
-
- /** Last timestamp. */
- private long lastTs;
-
- /** Amount of written bytes. */
- private long total;
-
- /** Closed flag. */
- private boolean closed;
-
- /**
- * Constructor.
- *
- * @param os Actual output stream.
- * @param clientLog Client logger.
- * @param logStreamId Log stream ID.
- */
- public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
- assert os != null;
- assert clientLog != null;
-
- this.os = os;
- this.clientLog = clientLog;
- this.logStreamId = logStreamId;
-
- lastTs = System.nanoTime();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(int b) throws IOException {
- writeStart();
-
- try {
- os.write(b);
- }
- finally {
- writeEnd();
- }
-
- total++;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(byte[] b) throws IOException {
- writeStart();
-
- try {
- os.write(b);
- }
- finally {
- writeEnd();
- }
-
- total += b.length;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
- writeStart();
-
- try {
- os.write(b, off, len);
- }
- finally {
- writeEnd();
- }
-
- total += len;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void flush() throws IOException {
- writeStart();
-
- try {
- os.flush();
- }
- finally {
- writeEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- if (!closed) {
- closed = true;
-
- writeStart();
-
- try {
- os.close();
- }
- finally {
- writeEnd();
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
- }
- }
-
- /**
- * Read start.
- */
- private void writeStart() {
- long now = System.nanoTime();
-
- userTime += now - lastTs;
-
- lastTs = now;
- }
-
- /**
- * Read end.
- */
- private void writeEnd() {
- long now = System.nanoTime();
-
- writeTime += now - lastTs;
-
- lastTs = now;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
deleted file mode 100644
index a0577ce..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
- * requested.
- * <p>
- * The class is expected to be used only from synchronized context and therefore is not tread-safe.
- */
-public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable {
- /** Secondary file system. */
- private final FileSystem fs;
-
- /** Path to the file to open. */
- private final Path path;
-
- /** Buffer size. */
- private final int bufSize;
-
- /** Actual input stream. */
- private FSDataInputStream in;
-
- /** Cached error occurred during output stream open. */
- private IOException err;
-
- /** Flag indicating that the stream was already opened. */
- private boolean opened;
-
- /**
- * Constructor.
- *
- * @param fs Secondary file system.
- * @param path Path to the file to open.
- * @param bufSize Buffer size.
- */
- public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) {
- assert fs != null;
- assert path != null;
-
- this.fs = fs;
- this.path = path;
- this.bufSize = bufSize;
- }
-
- /** Get input stream. */
- private PositionedReadable in() throws IOException {
- if (opened) {
- if (err != null)
- throw err;
- }
- else {
- opened = true;
-
- try {
- in = fs.open(path, bufSize);
-
- if (in == null)
- throw new IOException("Failed to open input stream (file system returned null): " + path);
- }
- catch (IOException e) {
- err = e;
-
- throw err;
- }
- }
-
- return in;
- }
-
- /**
- * Close wrapped input stream in case it was previously opened.
- */
- @Override public void close() {
- U.closeQuiet(in);
- }
-
- /** {@inheritDoc} */
- @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
- return in().read(pos, buf, off, len);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
deleted file mode 100644
index 37b58ab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * IGFS Hadoop stream descriptor.
- */
-public class HadoopIgfsStreamDelegate {
- /** RPC handler. */
- private final HadoopIgfsEx hadoop;
-
- /** Target. */
- private final Object target;
-
- /** Optional stream length. */
- private final long len;
-
- /**
- * Constructor.
- *
- * @param target Target.
- */
- public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
- this(hadoop, target, -1);
- }
-
- /**
- * Constructor.
- *
- * @param target Target.
- * @param len Optional length.
- */
- public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
- assert hadoop != null;
- assert target != null;
-
- this.hadoop = hadoop;
- this.target = target;
- this.len = len;
- }
-
- /**
- * @return RPC handler.
- */
- public HadoopIgfsEx hadoop() {
- return hadoop;
- }
-
- /**
- * @return Stream target.
- */
- @SuppressWarnings("unchecked")
- public <T> T target() {
- return (T) target;
- }
-
- /**
- * @return Length.
- */
- public long length() {
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return System.identityHashCode(target);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
- target == ((HadoopIgfsStreamDelegate)obj).target;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopIgfsStreamDelegate.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
deleted file mode 100644
index d81f765..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * IGFS input stream event listener.
- */
-public interface HadoopIgfsStreamEventListener {
- /**
- * Callback invoked when the stream is being closed.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void onClose() throws IgniteCheckedException;
-
- /**
- * Callback invoked when remote error occurs.
- *
- * @param errMsg Error message.
- */
- public void onError(String errMsg);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
deleted file mode 100644
index fa5cbc5..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utility constants and methods for IGFS Hadoop file system.
- */
-public class HadoopIgfsUtils {
- /** Parameter name for endpoint no embed mode flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
-
- /** Parameter name for endpoint no shared memory flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
-
- /** Parameter name for endpoint no local TCP flag. */
- public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
-
- /**
- * Get string parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return String value.
- */
- public static String parameter(Configuration cfg, String name, String authority, String dflt) {
- return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
- }
-
- /**
- * Get integer parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return Integer value.
- * @throws IOException In case of parse exception.
- */
- public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
- String name0 = String.format(name, authority != null ? authority : "");
-
- try {
- return cfg.getInt(name0, dflt);
- }
- catch (NumberFormatException ignore) {
- throw new IOException("Failed to parse parameter value to integer: " + name0);
- }
- }
-
- /**
- * Get boolean parameter.
- *
- * @param cfg Configuration.
- * @param name Parameter name.
- * @param authority Authority.
- * @param dflt Default value.
- * @return Boolean value.
- */
- public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
- return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
- }
-
- /**
- * Cast Ignite exception to appropriate IO exception.
- *
- * @param e Exception to cast.
- * @return Casted exception.
- */
- public static IOException cast(IgniteCheckedException e) {
- return cast(e, null);
- }
-
- /**
- * Cast Ignite exception to appropriate IO exception.
- *
- * @param e Exception to cast.
- * @param path Path for exceptions.
- * @return Casted exception.
- */
- @SuppressWarnings("unchecked")
- public static IOException cast(IgniteCheckedException e, @Nullable String path) {
- assert e != null;
-
- // First check for any nested IOException; if exists - re-throw it.
- if (e.hasCause(IOException.class))
- return e.getCause(IOException.class);
- else if (e.hasCause(IgfsPathNotFoundException.class))
- return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
- else if (e.hasCause(IgfsParentNotDirectoryException.class))
- return new ParentNotDirectoryException(path);
- else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
- return new PathIsNotEmptyDirectoryException(path);
- else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
- return new PathExistsException(path);
- else {
- String msg = e.getMessage();
-
- return msg == null ? new IOException(e) : new IOException(msg, e);
- }
- }
-
- /**
- * Deletes all files from the given file system.
- *
- * @param fs The file system to clean up.
- * @throws IOException On error.
- */
- public static void clear(FileSystem fs) throws IOException {
- // Delete root contents:
- FileStatus[] statuses = fs.listStatus(new Path("/"));
-
- if (statuses != null) {
- for (FileStatus stat: statuses)
- fs.delete(stat.getPath(), true);
- }
- }
-
- /**
- * Deletes all files from the given file system.
- *
- * @param fs The file system to clean up.
- * @throws IOException On error.
- */
- public static void clear(AbstractFileSystem fs) throws IOException {
- // Delete root contents:
- FileStatus[] statuses = fs.listStatus(new Path("/"));
-
- if (statuses != null) {
- for (FileStatus stat: statuses)
- fs.delete(stat.getPath(), true);
- }
- }
-
- /**
- * Constructor.
- */
- private HadoopIgfsUtils() {
- // No-op.
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
deleted file mode 100644
index f4ee97f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.IgniteIllegalStateException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteState.STARTED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
-
-/**
- * Wrapper for IGFS server.
- */
-public class HadoopIgfsWrapper implements HadoopIgfs {
- /** Delegate. */
- private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
-
- /** Authority. */
- private final String authority;
-
- /** Connection string. */
- private final HadoopIgfsEndpoint endpoint;
-
- /** Log directory. */
- private final String logDir;
-
- /** Configuration. */
- private final Configuration conf;
-
- /** Logger. */
- private final Log log;
-
- /** The user name this wrapper works on behalf of. */
- private final String userName;
-
- /**
- * Constructor.
- *
- * @param authority Authority (connection string).
- * @param logDir Log directory for server.
- * @param conf Configuration.
- * @param log Current logger.
- */
- public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
- throws IOException {
- try {
- this.authority = authority;
- this.endpoint = new HadoopIgfsEndpoint(authority);
- this.logDir = logDir;
- this.conf = conf;
- this.log = log;
- this.userName = user;
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to parse endpoint: " + authority, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
- @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) {
- return hndResp;
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public void close(boolean force) {
- Delegate delegate = delegateRef.get();
-
- if (delegate != null && delegateRef.compareAndSet(delegate, null))
- delegate.close(force);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
- @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.info(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
- @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.update(path, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.setTimes(path, accessTime, modificationTime);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.rename(src, dest);
- }
- }, src);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.delete(path, recursive);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
- final long len) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
- @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.affinity(path, start, len);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
- @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.contentSummary(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Boolean>() {
- @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.mkdirs(path, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
- @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.listFiles(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
- @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.listPaths(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus fsStatus() throws IOException {
- return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
- @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
- throws IgniteCheckedException, IOException {
- return hadoop.fsStatus();
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
- return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.open(path);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.open(path, seqReadsBeforePrefetch);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
- final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
- throws IOException {
- return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
- }
- }, path);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
- @Nullable final Map<String, String> props) throws IOException {
- return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
- IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
- return hadoop.append(path, create, props);
- }
- }, path);
- }
-
- /**
- * Execute closure which is not path-specific.
- *
- * @param clo Closure.
- * @return Result.
- * @throws IOException If failed.
- */
- private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
- return withReconnectHandling(clo, null);
- }
-
- /**
- * Execute closure.
- *
- * @param clo Closure.
- * @param path Path for exceptions.
- * @return Result.
- * @throws IOException If failed.
- */
- private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
- throws IOException {
- Exception err = null;
-
- for (int i = 0; i < 2; i++) {
- Delegate curDelegate = null;
-
- boolean close = false;
- boolean force = false;
-
- try {
- curDelegate = delegate();
-
- assert curDelegate != null;
-
- close = curDelegate.doomed;
-
- return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
- }
- catch (HadoopIgfsCommunicationException e) {
- if (curDelegate != null && !curDelegate.doomed) {
- // Try getting rid fo faulty delegate ASAP.
- delegateRef.compareAndSet(curDelegate, null);
-
- close = true;
- force = true;
- }
-
- if (log.isDebugEnabled())
- log.debug("Failed to send message to a server: " + e);
-
- err = e;
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
- }
- finally {
- if (close) {
- assert curDelegate != null;
-
- curDelegate.close(force);
- }
- }
- }
-
- List<Throwable> list = X.getThrowableList(err);
-
- Throwable cause = list.get(list.size() - 1);
-
- throw new IOException("Failed to communicate with IGFS: "
- + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err);
- }
-
- /**
- * Get delegate creating it if needed.
- *
- * @return Delegate.
- */
- private Delegate delegate() throws HadoopIgfsCommunicationException {
- // These fields will contain possible exceptions from shmem and TCP endpoints.
- Exception errShmem = null;
- Exception errTcp = null;
-
- // 1. If delegate is set, return it immediately.
- Delegate curDelegate = delegateRef.get();
-
- if (curDelegate != null)
- return curDelegate;
-
- // 2. Guess that we are in the same VM.
- boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false);
-
- if (!skipInProc) {
- IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
-
- if (igfs != null) {
- HadoopIgfsEx hadoop = null;
-
- try {
- hadoop = new HadoopIgfsInProc(igfs, log, userName);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- if (hadoop != null)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
- }
- }
- }
-
- // 3. Try connecting using shmem.
- boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
-
- if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
- HadoopIgfsEx hadoop = null;
-
- try {
- hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
-
- errShmem = e;
- }
- }
-
- // 4. Try local TCP connection.
- boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
-
- if (curDelegate == null && !skipLocTcp) {
- HadoopIgfsEx hadoop = null;
-
- try {
- hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log, userName);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
- ", port=" + endpoint.port() + ']', e);
-
- errTcp = e;
- }
- }
-
- // 5. Try remote TCP connection.
- if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
- HadoopIgfsEx hadoop = null;
-
- try {
- hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log, userName);
-
- curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
- }
- catch (IOException | IgniteCheckedException e) {
- if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
-
- if (log.isDebugEnabled())
- log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
- ", port=" + endpoint.port() + ']', e);
-
- errTcp = e;
- }
- }
-
- if (curDelegate != null) {
- if (!delegateRef.compareAndSet(null, curDelegate))
- curDelegate.doomed = true;
-
- return curDelegate;
- }
- else {
- SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=[");
-
- if (errShmem != null)
- errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
-
- errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
-
- errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
- "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
-
- throw new HadoopIgfsCommunicationException(errMsg.toString());
- }
- }
-
- /**
- * File system operation closure.
- */
- private static interface FileSystemClosure<T> {
- /**
- * Call closure body.
- *
- * @param hadoop RPC handler.
- * @param hndResp Handshake response.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- * @throws IOException If failed.
- */
- public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
- }
-
- /**
- * Delegate.
- */
- private static class Delegate {
- /** RPC handler. */
- private final HadoopIgfsEx hadoop;
-
- /** Handshake request. */
- private final IgfsHandshakeResponse hndResp;
-
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean();
-
- /** Whether this delegate must be closed at the end of the next invocation. */
- private boolean doomed;
-
- /**
- * Constructor.
- *
- * @param hadoop Hadoop.
- * @param hndResp Handshake response.
- */
- private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
- this.hadoop = hadoop;
- this.hndResp = hndResp;
- }
-
- /**
- * Close underlying RPC handler.
- *
- * @param force Force flag.
- */
- private void close(boolean force) {
- if (closeGuard.compareAndSet(false, true))
- hadoop.close(force);
- }
- }
-
- /**
- * Helper method to find Igfs of the given name in the given Ignite instance.
- *
- * @param gridName The name of the grid to check.
- * @param igfsName The name of Igfs.
- * @return The file system instance, or null if not found.
- */
- private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) {
- if (Ignition.state(gridName) == STARTED) {
- try {
- for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) {
- if (F.eq(fs.name(), igfsName))
- return (IgfsEx)fs;
- }
- }
- catch (IgniteIllegalStateException ignore) {
- // May happen if the grid state has changed:
- }
- }
-
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..0ab64d9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+ /** Counters. */
+ private final HadoopMapReduceCounters cntrs;
+
+ /** Group name. */
+ private final String name;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntrs Client counters instance.
+ * @param name Group name.
+ */
+ HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+ this.cntrs = cntrs;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addCounter(Counter counter) {
+ addCounter(counter.getName(), counter.getDisplayName(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter addCounter(String name, String displayName, long value) {
+ final Counter counter = cntrs.findCounter(this.name, name);
+
+ counter.setValue(value);
+
+ return counter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, String displayName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, boolean create) {
+ return cntrs.findCounter(name, counterName, create);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cntrs.groupSize(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+ for (final Counter counter : rightGroup)
+ cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Counter> iterator() {
+ return cntrs.iterateGroup(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
\ No newline at end of file