You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:37:35 UTC
[2/5] incubator-ignite git commit: # IGNITE-386: Moving classes (5).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
new file mode 100644
index 0000000..c9c61fe
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * IO layer implementation based on blocking IPC streams.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public class HadoopIgfsIpcIo implements HadoopIgfsIo {
+ /** Logger. */
+ private Log log;
+
+ /** Request futures map. */
+ private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
+ new ConcurrentHashMap8<>();
+
+ /** Request ID counter. */
+ private AtomicLong reqIdCnt = new AtomicLong();
+
+ /** Endpoint. */
+ private IpcEndpoint endpoint;
+
+ /** Endpoint output stream. */
+ private IgfsDataOutputStream out;
+
+ /** Protocol. */
+ private final IgfsMarshaller marsh;
+
+ /** Client reader thread. */
+ private Thread reader;
+
+ /** Lock for graceful shutdown. */
+ private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
+
+ /** Stopping flag. */
+ private volatile boolean stopping;
+
+ /** Server endpoint address. */
+ private final String endpointAddr;
+
+ /** Number of open file system sessions. */
+ private final AtomicInteger activeCnt = new AtomicInteger(1);
+
+ /** Event listeners. */
+ private final Collection<HadoopIgfsIpcIoListener> lsnrs =
+ new GridConcurrentHashSet<>();
+
+ /** Cached connections. */
+ private static final ConcurrentMap<String, HadoopIgfsIpcIo> ipcCache =
+ new ConcurrentHashMap8<>();
+
+ /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */
+ private static final GridStripedLock initLock = new GridStripedLock(32);
+
+ /**
+ * @param endpointAddr Endpoint.
+ * @param marsh Protocol.
+ * @param log Logger to use.
+ */
+ public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) {
+ assert endpointAddr != null;
+ assert marsh != null;
+
+ this.endpointAddr = endpointAddr;
+ this.marsh = marsh;
+ this.log = log;
+ }
+
+ /**
+ * Returns a started and valid instance of this class
+ * for a given endpoint.
+ *
+ * @param log Logger to use for new instance.
+ * @param endpoint Endpoint string.
+ * @return New or existing cached instance, which is started and operational.
+ * @throws IOException If new instance was created but failed to start.
+ */
+ public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException {
+ while (true) {
+ HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint);
+
+ if (clientIo != null) {
+ if (clientIo.acquire())
+ return clientIo;
+ else
+ // If concurrent close.
+ ipcCache.remove(endpoint, clientIo);
+ }
+ else {
+ Lock lock = initLock.getLock(endpoint);
+
+ lock.lock();
+
+ try {
+ clientIo = ipcCache.get(endpoint);
+
+ if (clientIo != null) { // Perform double check.
+ if (clientIo.acquire())
+ return clientIo;
+ else
+ // If concurrent close.
+ ipcCache.remove(endpoint, clientIo);
+ }
+
+ // Otherwise try creating a new one.
+ clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log);
+
+ try {
+ clientIo.start();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo);
+
+ // Put in exclusive lock.
+ assert old == null;
+
+ return clientIo;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * Increases usage count for this instance.
+ *
+ * @return {@code true} if usage count is greater than zero.
+ */
+ private boolean acquire() {
+ while (true) {
+ int cnt = activeCnt.get();
+
+ if (cnt == 0) {
+ if (log.isDebugEnabled())
+ log.debug("IPC IO not acquired (count was 0): " + this);
+
+ return false;
+ }
+
+ // Need to make sure that no-one decremented count in between.
+ if (activeCnt.compareAndSet(cnt, cnt + 1)) {
+ if (log.isDebugEnabled())
+ log.debug("IPC IO acquired: " + this);
+
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Releases this instance, decrementing usage count.
+ * <p>
+ * If usage count becomes zero, the instance is stopped
+ * and removed from cache.
+ */
+ public void release() {
+ while (true) {
+ int cnt = activeCnt.get();
+
+ if (cnt == 0) {
+ if (log.isDebugEnabled())
+ log.debug("IPC IO not released (count was 0): " + this);
+
+ return;
+ }
+
+ if (activeCnt.compareAndSet(cnt, cnt - 1)) {
+ if (cnt == 1) {
+ ipcCache.remove(endpointAddr, this);
+
+ if (log.isDebugEnabled())
+ log.debug("IPC IO stopping as unused: " + this);
+
+ stop();
+ }
+ else if (log.isDebugEnabled())
+ log.debug("IPC IO released: " + this);
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Closes this IO instance, removing it from cache.
+ */
+ public void forceClose() {
+ if (ipcCache.remove(endpointAddr, this))
+ stop();
+ }
+
+ /**
+ * Starts the IO.
+ *
+ * @throws IgniteCheckedException If failed to connect the endpoint.
+ */
+ private void start() throws IgniteCheckedException {
+ boolean success = false;
+
+ try {
+ endpoint = IpcEndpointFactory.connectEndpoint(
+ endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, ""));
+
+ out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
+
+ reader = new ReaderThread();
+
+ // Required for Hadoop 2.x
+ reader.setDaemon(true);
+
+ reader.start();
+
+ success = true;
+ }
+ catch (IgniteCheckedException e) {
+ IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class);
+
+ if (resEx != null)
+ throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx);
+
+ throw e;
+ }
+ finally {
+ if (!success)
+ stop();
+ }
+ }
+
+ /**
+ * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed.
+ * Close listeners will be invoked as if connection is closed by server.
+ */
+ private void stop() {
+ close0(null);
+
+ if (reader != null) {
+ try {
+ U.interrupt(reader);
+ U.join(reader);
+
+ reader = null;
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ Thread.currentThread().interrupt();
+
+ log.warn("Got interrupted while waiting for reader thread to shut down (will return).");
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) {
+ if (!busyLock.readLock().tryLock()) {
+ lsnr.onClose();
+
+ return;
+ }
+
+ boolean invokeNow = false;
+
+ try {
+ invokeNow = stopping;
+
+ if (!invokeNow)
+ lsnrs.add(lsnr);
+ }
+ finally {
+ busyLock.readLock().unlock();
+
+ if (invokeNow)
+ lsnr.onClose();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) {
+ lsnrs.remove(lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException {
+ return send(msg, null, 0, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff,
+ int outLen) throws IgniteCheckedException {
+ assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK;
+
+ if (!busyLock.readLock().tryLock())
+ throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
+ "closed).");
+
+ try {
+ if (stopping)
+ throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
+ "closed).");
+
+ long reqId = reqIdCnt.getAndIncrement();
+
+ HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>();
+
+ fut.outputBuffer(outBuf);
+ fut.outputOffset(outOff);
+ fut.outputLength(outLen);
+ fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK);
+
+ HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut);
+
+ assert oldFut == null;
+
+ if (log.isDebugEnabled())
+ log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']');
+
+ byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command());
+
+ IgniteCheckedException err = null;
+
+ try {
+ synchronized (this) {
+ marsh.marshall(msg, hdr, out);
+
+ out.flush(); // Blocking operation + sometimes system call.
+ }
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ catch (IOException e) {
+ err = new HadoopIgfsCommunicationException(e);
+ }
+
+ if (err != null) {
+ reqMap.remove(reqId, fut);
+
+ fut.onDone(err);
+ }
+
+ return fut;
+ }
+ finally {
+ busyLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException {
+ if (!busyLock.readLock().tryLock())
+ throw new HadoopIgfsCommunicationException("Failed to send message (client is being " +
+ "concurrently closed).");
+
+ try {
+ if (stopping)
+ throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed).");
+
+ assert msg.command() == IgfsIpcCommand.WRITE_BLOCK;
+
+ IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
+
+ byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK);
+
+ U.longToBytes(req.streamId(), hdr, 12);
+ U.intToBytes(req.length(), hdr, 20);
+
+ synchronized (this) {
+ out.write(hdr);
+ out.write(req.data(), (int)req.position(), req.length());
+
+ out.flush();
+ }
+ }
+ catch (IOException e) {
+ throw new HadoopIgfsCommunicationException(e);
+ }
+ finally {
+ busyLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Closes client but does not wait.
+ *
+ * @param err Error.
+ */
+ private void close0(@Nullable Throwable err) {
+ busyLock.writeLock().lock();
+
+ try {
+ if (stopping)
+ return;
+
+ stopping = true;
+ }
+ finally {
+ busyLock.writeLock().unlock();
+ }
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " +
+ "is received).");
+
+ // Clean up resources.
+ U.closeQuiet(out);
+
+ if (endpoint != null)
+ endpoint.close();
+
+ // Unwind futures. We can safely iterate here because no more futures will be added.
+ Iterator<HadoopIgfsFuture> it = reqMap.values().iterator();
+
+ while (it.hasNext()) {
+ HadoopIgfsFuture fut = it.next();
+
+ fut.onDone(err);
+
+ it.remove();
+ }
+
+ for (HadoopIgfsIpcIoListener lsnr : lsnrs)
+ lsnr.onClose();
+ }
+
+ /**
+ * Do not extend {@code GridThread} to minimize class dependencies.
+ */
+ private class ReaderThread extends Thread {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run() {
+ // Error to fail pending futures.
+ Throwable err = null;
+
+ try {
+ InputStream in = endpoint.inputStream();
+
+ IgfsDataInputStream dis = new IgfsDataInputStream(in);
+
+ byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
+ byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE];
+
+ while (!Thread.currentThread().isInterrupted()) {
+ dis.readFully(hdr);
+
+ long reqId = U.bytesToLong(hdr, 0);
+
+ // We don't wait for write responses, therefore reqId is -1.
+ if (reqId == -1) {
+ // We received a response which normally should not be sent. It must contain an error.
+ dis.readFully(msgHdr);
+
+ assert msgHdr[4] != 0;
+
+ String errMsg = dis.readUTF();
+
+ // Error code.
+ dis.readInt();
+
+ long streamId = dis.readLong();
+
+ for (HadoopIgfsIpcIoListener lsnr : lsnrs)
+ lsnr.onError(streamId, errMsg);
+ }
+ else {
+ HadoopIgfsFuture<Object> fut = reqMap.remove(reqId);
+
+ if (fut == null) {
+ String msg = "Failed to read response from server: response closure is unavailable for " +
+ "requestId (will close connection):" + reqId;
+
+ log.warn(msg);
+
+ err = new IgniteCheckedException(msg);
+
+ break;
+ }
+ else {
+ try {
+ IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8));
+
+ if (log.isDebugEnabled())
+ log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']');
+
+ Object res = null;
+
+ if (fut.read()) {
+ dis.readFully(msgHdr);
+
+ boolean hasErr = msgHdr[4] != 0;
+
+ if (hasErr) {
+ String errMsg = dis.readUTF();
+
+ // Error code.
+ Integer errCode = dis.readInt();
+
+ IgfsControlResponse.throwError(errCode, errMsg);
+ }
+
+ int blockLen = U.bytesToInt(msgHdr, 5);
+
+ int readLen = Math.min(blockLen, fut.outputLength());
+
+ if (readLen > 0) {
+ assert fut.outputBuffer() != null;
+
+ dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen);
+ }
+
+ if (readLen != blockLen) {
+ byte[] buf = new byte[blockLen - readLen];
+
+ dis.readFully(buf);
+
+ res = buf;
+ }
+ }
+ else
+ res = marsh.unmarshall(cmd, hdr, dis);
+
+ fut.onDone(res);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to apply response closure (will fail request future): " +
+ e.getMessage());
+
+ fut.onDone(e);
+
+ err = e;
+ }
+ }
+ }
+ }
+ }
+ catch (EOFException ignored) {
+ err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer).");
+ }
+ catch (IOException e) {
+ if (!stopping)
+ log.error("Failed to read data (connection will be closed)", e);
+
+ err = new HadoopIgfsCommunicationException(e);
+ }
+ catch (IgniteCheckedException e) {
+ if (!stopping)
+ log.error("Failed to obtain endpoint input stream (connection will be closed)", e);
+
+ err = e;
+ }
+ finally {
+ close0(err);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt +
+ ", stopping=" + stopping + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
new file mode 100644
index 0000000..c2dad82
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+/**
+ * Listens to the events of {@link HadoopIgfsIpcIo}.
+ */
+public interface HadoopIgfsIpcIoListener {
+ /**
+ * Callback invoked when the IO is being closed.
+ */
+ public void onClose();
+
+ /**
+ * Callback invoked when remote error occurs.
+ *
+ * @param streamId Stream ID.
+ * @param errMsg Error message.
+ */
+ public void onError(long streamId, String errMsg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
new file mode 100644
index 0000000..35fd27c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * JCL logger wrapper for Hadoop.
+ */
+public class HadoopIgfsJclLogger implements IgniteLogger {
+ /** JCL implementation proxy. */
+ private Log impl;
+
+ /**
+ * Constructor.
+ *
+ * @param impl JCL implementation to use.
+ */
+ HadoopIgfsJclLogger(Log impl) {
+ assert impl != null;
+
+ this.impl = impl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger getLogger(Object ctgr) {
+ return new HadoopIgfsJclLogger(LogFactory.getLog(
+ ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void trace(String msg) {
+ impl.trace(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void debug(String msg) {
+ impl.debug(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void info(String msg) {
+ impl.info(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void warning(String msg) {
+ impl.warn(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void warning(String msg, @Nullable Throwable e) {
+ impl.warn(msg, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(String msg) {
+ impl.error(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isQuiet() {
+ return !isInfoEnabled() && !isDebugEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(String msg, @Nullable Throwable e) {
+ impl.error(msg, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTraceEnabled() {
+ return impl.isTraceEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDebugEnabled() {
+ return impl.isDebugEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInfoEnabled() {
+ return impl.isInfoEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String fileName() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "IgfsHadoopJclLogger [impl=" + impl + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
new file mode 100644
index 0000000..662541a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*;
+
+/**
+ * Communication with external process (TCP or shmem).
+ */
+public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
+ /** Expected result is boolean. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
+
+ /** Expected result is boolean. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure();
+
+ /** Expected result is {@code IgfsFile}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
+
+ /** Expected result is {@code IgfsHandshakeResponse} */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+ IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
+
+ /** Expected result is {@code IgfsStatus} */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
+ createClosure();
+
+ /** Expected result is {@code IgfsFile}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+ IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
+
+ /** Expected result is {@code IgfsFile}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+ Collection<IgfsFile>> FILE_COL_RES = createClosure();
+
+ /** Expected result is {@code IgfsFile}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+ Collection<IgfsPath>> PATH_COL_RES = createClosure();
+
+ /** Expected result is {@code IgfsPathSummary}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
+ createClosure();
+
+ /** Expected result is {@code IgfsFile}. */
+ private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+ Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
+
+ /** Grid name. */
+ private final String grid;
+
+ /** IGFS name. */
+ private final String igfs;
+
+ /** Client log. */
+ private final Log log;
+
+ /** Client IO. */
+ private final HadoopIgfsIpcIo io;
+
+ /** Event listeners. */
+ private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
+
+ /**
+ * Constructor for TCP endpoint.
+ *
+ * @param host Host.
+ * @param port Port.
+ * @param grid Grid name.
+ * @param igfs IGFS name.
+ * @param log Client logger.
+ * @throws IOException If failed.
+ */
+ public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
+ this(host, port, grid, igfs, false, log);
+ }
+
+ /**
+ * Constructor for shmem endpoint.
+ *
+ * @param port Port.
+ * @param grid Grid name.
+ * @param igfs IGFS name.
+ * @param log Client logger.
+ * @throws IOException If failed.
+ */
+ public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
+ this(null, port, grid, igfs, true, log);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param host Host.
+ * @param port Port.
+ * @param grid Grid name.
+ * @param igfs IGFS name.
+ * @param shmem Shared memory flag.
+ * @param log Client logger.
+ * @throws IOException If failed.
+ */
+ private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
+ throws IOException {
+ assert host != null && !shmem || host == null && shmem :
+ "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
+
+ String endpoint = host != null ? host + ":" + port : "shmem:" + port;
+
+ this.grid = grid;
+ this.igfs = igfs;
+ this.log = log;
+
+ io = HadoopIgfsIpcIo.get(log, endpoint);
+
+ io.addEventListener(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
+ final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
+
+ req.gridName(grid);
+ req.igfsName(igfs);
+ req.logDirectory(logDir);
+
+ return io.send(req).chain(HANDSHAKE_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(boolean force) {
+ assert io != null;
+
+ io.removeEventListener(this);
+
+ if (force)
+ io.forceClose();
+ else
+ io.release();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(INFO);
+ msg.path(path);
+
+ return io.send(msg).chain(FILE_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(UPDATE);
+ msg.path(path);
+ msg.properties(props);
+
+ return io.send(msg).chain(FILE_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(SET_TIMES);
+ msg.path(path);
+ msg.accessTime(accessTime);
+ msg.modificationTime(modificationTime);
+
+ return io.send(msg).chain(BOOL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(RENAME);
+ msg.path(src);
+ msg.destinationPath(dest);
+
+ return io.send(msg).chain(BOOL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(DELETE);
+ msg.path(path);
+ msg.flag(recursive);
+
+ return io.send(msg).chain(BOOL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+ throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(AFFINITY);
+ msg.path(path);
+ msg.start(start);
+ msg.length(len);
+
+ return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(PATH_SUMMARY);
+ msg.path(path);
+
+ return io.send(msg).chain(SUMMARY_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(MAKE_DIRECTORIES);
+ msg.path(path);
+ msg.properties(props);
+
+ return io.send(msg).chain(BOOL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(LIST_FILES);
+ msg.path(path);
+
+ return io.send(msg).chain(FILE_COL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(LIST_PATHS);
+ msg.path(path);
+
+ return io.send(msg).chain(PATH_COL_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+ return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(OPEN_READ);
+ msg.path(path);
+ msg.flag(false);
+
+ IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+ return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
+ int seqReadsBeforePrefetch) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(OPEN_READ);
+ msg.path(path);
+ msg.flag(true);
+ msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+
+ IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+ return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+ int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(OPEN_CREATE);
+ msg.path(path);
+ msg.flag(overwrite);
+ msg.colocate(colocate);
+ msg.properties(props);
+ msg.replication(replication);
+ msg.blockSize(blockSize);
+
+ Long streamId = io.send(msg).chain(LONG_RES).get();
+
+ return new HadoopIgfsStreamDelegate(this, streamId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+ @Nullable Map<String, String> props) throws IgniteCheckedException {
+ final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+ msg.command(OPEN_APPEND);
+ msg.path(path);
+ msg.flag(create);
+ msg.properties(props);
+
+ Long streamId = io.send(msg).chain(LONG_RES).get();
+
+ return new HadoopIgfsStreamDelegate(this, streamId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
+ final @Nullable byte[] outBuf, final int outOff, final int outLen) {
+ assert len > 0;
+
+ final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+ msg.command(READ_BLOCK);
+ msg.streamId((long) desc.target());
+ msg.position(pos);
+ msg.length(len);
+
+ try {
+ return io.send(msg, outBuf, outOff, outLen);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridPlainFutureAdapter<>(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
+ throws IOException {
+ final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+ msg.command(WRITE_BLOCK);
+ msg.streamId((long) desc.target());
+ msg.data(data);
+ msg.position(off);
+ msg.length(len);
+
+ try {
+ io.sendPlain(msg);
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
+ final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+ msg.command(CLOSE);
+ msg.streamId((long)desc.target());
+
+ try {
+ io.send(msg).chain(BOOL_RES).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw HadoopIgfsUtils.cast(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
+ HadoopIgfsStreamEventListener lsnr) {
+ long streamId = desc.target();
+
+ HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
+
+ assert lsnr0 == null || lsnr0 == lsnr;
+
+ if (log.isDebugEnabled())
+ log.debug("Added stream event listener [streamId=" + streamId + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
+ long streamId = desc.target();
+
+ HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
+
+ if (lsnr0 != null && log.isDebugEnabled())
+ log.debug("Removed stream event listener [streamId=" + streamId + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+ try {
+ lsnr.onClose();
+ }
+ catch (IgniteCheckedException e) {
+ log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(long streamId, String errMsg) {
+ HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
+
+ if (lsnr != null)
+ lsnr.onError(errMsg);
+ else
+ log.warn("Received write error response for not registered output stream (will ignore) " +
+ "[streamId= " + streamId + ']');
+ }
+
+ /**
+ * Creates conversion closure for given type.
+ *
+ * @param <T> Type of expected result.
+ * @return Conversion closure.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() {
+ return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() {
+ @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException {
+ IgfsControlResponse res = (IgfsControlResponse)fut.get();
+
+ if (res.hasError())
+ res.throwError();
+
+ return (T)res.response();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
new file mode 100644
index 0000000..902d710
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * IGFS Hadoop output stream implementation.
+ */
+public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener {
+ /** Log instance. */
+ private Log log;
+
+ /** Client logger. */
+ private IgfsLogger clientLog;
+
+ /** Log stream ID. */
+ private long logStreamId;
+
+ /** Server stream delegate. */
+ private HadoopIgfsStreamDelegate delegate;
+
+ /** Closed flag. */
+ private volatile boolean closed;
+
+ /** Flag set if stream was closed due to connection breakage. */
+ private boolean connBroken;
+
+ /** Error message. */
+ private volatile String errMsg;
+
+ /** Read time. */
+ private long writeTime;
+
+ /** User time. */
+ private long userTime;
+
+ /** Last timestamp. */
+ private long lastTs;
+
+ /** Amount of written bytes. */
+ private long total;
+
+ /**
+ * Creates light output stream.
+ *
+ * @param delegate Server stream delegate.
+ * @param log Logger to use.
+ * @param clientLog Client logger.
+ */
+ public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log,
+ IgfsLogger clientLog, long logStreamId) {
+ this.delegate = delegate;
+ this.log = log;
+ this.clientLog = clientLog;
+ this.logStreamId = logStreamId;
+
+ lastTs = System.nanoTime();
+
+ delegate.hadoop().addEventListener(delegate, this);
+ }
+
+ /**
+ * Read start.
+ */
+ private void writeStart() {
+ long now = System.nanoTime();
+
+ userTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /**
+ * Read end.
+ */
+ private void writeEnd() {
+ long now = System.nanoTime();
+
+ writeTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
+ check();
+
+ writeStart();
+
+ try {
+ delegate.hadoop().writeData(delegate, b, off, len);
+
+ total += len;
+ }
+ finally {
+ writeEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ write(new byte[] {(byte)b});
+
+ total++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() throws IOException {
+ delegate.hadoop().flush(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (!closed) {
+ if (log.isDebugEnabled())
+ log.debug("Closing output stream: " + delegate);
+
+ writeStart();
+
+ delegate.hadoop().closeStream(delegate);
+
+ markClosed(false);
+
+ writeEnd();
+
+ if (clientLog.isLogEnabled())
+ clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+
+ if (log.isDebugEnabled())
+ log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
+ ", userTime=" + userTime / 1000 + ']');
+ }
+ else if(connBroken)
+ throw new IOException(
+ "Failed to close stream, because connection was broken (data could have been lost).");
+ }
+
+ /**
+ * Marks stream as closed.
+ *
+ * @param connBroken {@code True} if connection with server was lost.
+ */
+ private void markClosed(boolean connBroken) {
+ // It is ok to have race here.
+ if (!closed) {
+ closed = true;
+
+ delegate.hadoop().removeEventListener(delegate);
+
+ this.connBroken = connBroken;
+ }
+ }
+
+ /**
+ * @throws IOException If check failed.
+ */
+ private void check() throws IOException {
+ String errMsg0 = errMsg;
+
+ if (errMsg0 != null)
+ throw new IOException(errMsg0);
+
+ if (closed) {
+ if (connBroken)
+ throw new IOException("Server connection was lost.");
+ else
+ throw new IOException("Stream is closed.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() throws IgniteCheckedException {
+ markClosed(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(String errMsg) {
+ this.errMsg = errMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
new file mode 100644
index 0000000..f99f14c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+
+import java.util.*;
+
+import static org.apache.ignite.IgniteFs.*;
+
+/**
+ * Hadoop file system properties.
+ */
+public class HadoopIgfsProperties {
+ /** Username. */
+ private String usrName;
+
+ /** Group name. */
+ private String grpName;
+
+ /** Permissions. */
+ private FsPermission perm;
+
+ /**
+ * Constructor.
+ *
+ * @param props Properties.
+ * @throws IgniteException In case of error.
+ */
+ public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
+ usrName = props.get(PROP_USER_NAME);
+ grpName = props.get(PROP_GROUP_NAME);
+
+ String permStr = props.get(PROP_PERMISSION);
+
+ if (permStr != null) {
+ try {
+ perm = new FsPermission((short)Integer.parseInt(permStr, 8));
+ }
+ catch (NumberFormatException ignore) {
+ throw new IgniteException("Permissions cannot be parsed: " + permStr);
+ }
+ }
+ }
+
+ /**
+ * Get user name.
+ *
+ * @return User name.
+ */
+ public String userName() {
+ return usrName;
+ }
+
+ /**
+ * Get group name.
+ *
+ * @return Group name.
+ */
+ public String groupName() {
+ return grpName;
+ }
+
+ /**
+ * Get permission.
+ *
+ * @return Permission.
+ */
+ public FsPermission permission() {
+ return perm;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
new file mode 100644
index 0000000..4530e64
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system input stream wrapper.
+ */
+public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
+ /** Actual input stream to the secondary file system. */
+ private final FSDataInputStream is;
+
+ /** Client logger. */
+ private final IgfsLogger clientLog;
+
+ /** Log stream ID. */
+ private final long logStreamId;
+
+ /** Read time. */
+ private long readTime;
+
+ /** User time. */
+ private long userTime;
+
+ /** Last timestamp. */
+ private long lastTs;
+
+ /** Amount of read bytes. */
+ private long total;
+
+ /** Closed flag. */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ *
+ * @param is Actual input stream to the secondary file system.
+ * @param clientLog Client log.
+ */
+ public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
+ assert is != null;
+ assert clientLog != null;
+
+ this.is = is;
+ this.clientLog = clientLog;
+ this.logStreamId = logStreamId;
+
+ lastTs = System.nanoTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(byte[] b) throws IOException {
+ readStart();
+
+ int res;
+
+ try {
+ res = is.read(b);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (res != -1)
+ total += res;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+ readStart();
+
+ int res;
+
+ try {
+ res = super.read(b, off, len);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (res != -1)
+ total += res;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long skip(long n) throws IOException {
+ readStart();
+
+ long res;
+
+ try {
+ res = is.skip(n);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logSkip(logStreamId, res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int available() throws IOException {
+ readStart();
+
+ try {
+ return is.available();
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+
+ readStart();
+
+ try {
+ is.close();
+ }
+ finally {
+ readEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void mark(int readLimit) {
+ readStart();
+
+ try {
+ is.mark(readLimit);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logMark(logStreamId, readLimit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void reset() throws IOException {
+ readStart();
+
+ try {
+ is.reset();
+ }
+ finally {
+ readEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logReset(logStreamId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean markSupported() {
+ readStart();
+
+ try {
+ return is.markSupported();
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read() throws IOException {
+ readStart();
+
+ int res;
+
+ try {
+ res = is.read();
+ }
+ finally {
+ readEnd();
+ }
+
+ if (res != -1)
+ total++;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+ readStart();
+
+ int res;
+
+ try {
+ res = is.read(pos, buf, off, len);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (res != -1)
+ total += res;
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRandomRead(logStreamId, pos, res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ readStart();
+
+ try {
+ is.readFully(pos, buf, off, len);
+ }
+ finally {
+ readEnd();
+ }
+
+ total += len;
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRandomRead(logStreamId, pos, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
+ readStart();
+
+ try {
+ is.readFully(pos, buf);
+ }
+ finally {
+ readEnd();
+ }
+
+ total += buf.length;
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRandomRead(logStreamId, pos, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void seek(long pos) throws IOException {
+ readStart();
+
+ try {
+ is.seek(pos);
+ }
+ finally {
+ readEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logSeek(logStreamId, pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long getPos() throws IOException {
+ readStart();
+
+ try {
+ return is.getPos();
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ readStart();
+
+ try {
+ return is.seekToNewSource(targetPos);
+ }
+ finally {
+ readEnd();
+ }
+ }
+
+ /**
+ * Read start.
+ */
+ private void readStart() {
+ long now = System.nanoTime();
+
+ userTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /**
+ * Read end.
+ */
+ private void readEnd() {
+ long now = System.nanoTime();
+
+ readTime += now - lastTs;
+
+ lastTs = now;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
new file mode 100644
index 0000000..9ab552e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system output stream wrapper.
+ */
+public class HadoopIgfsProxyOutputStream extends OutputStream {
+ /** Actual output stream. */
+ private FSDataOutputStream os;
+
+ /** Client logger. */
+ private final IgfsLogger clientLog;
+
+ /** Log stream ID. */
+ private final long logStreamId;
+
+ /** Read time. */
+ private long writeTime;
+
+ /** User time. */
+ private long userTime;
+
+ /** Last timestamp. */
+ private long lastTs;
+
+ /** Amount of written bytes. */
+ private long total;
+
+ /** Closed flag. */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ *
+ * @param os Actual output stream.
+ * @param clientLog Client logger.
+ * @param logStreamId Log stream ID.
+ */
+ public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
+ assert os != null;
+ assert clientLog != null;
+
+ this.os = os;
+ this.clientLog = clientLog;
+ this.logStreamId = logStreamId;
+
+ lastTs = System.nanoTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(int b) throws IOException {
+ writeStart();
+
+ try {
+ os.write(b);
+ }
+ finally {
+ writeEnd();
+ }
+
+ total++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(byte[] b) throws IOException {
+ writeStart();
+
+ try {
+ os.write(b);
+ }
+ finally {
+ writeEnd();
+ }
+
+ total += b.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+ writeStart();
+
+ try {
+ os.write(b, off, len);
+ }
+ finally {
+ writeEnd();
+ }
+
+ total += len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void flush() throws IOException {
+ writeStart();
+
+ try {
+ os.flush();
+ }
+ finally {
+ writeEnd();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+
+ writeStart();
+
+ try {
+ os.close();
+ }
+ finally {
+ writeEnd();
+ }
+
+ if (clientLog.isLogEnabled())
+ clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+ }
+ }
+
+ /**
+ * Read start.
+ */
+ private void writeStart() {
+ long now = System.nanoTime();
+
+ userTime += now - lastTs;
+
+ lastTs = now;
+ }
+
+ /**
+ * Read end.
+ */
+ private void writeEnd() {
+ long now = System.nanoTime();
+
+ writeTime += now - lastTs;
+
+ lastTs = now;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java
new file mode 100644
index 0000000..f410fae
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
+ * requested.
+ * <p>
+ * The class is expected to be used only from synchronized context and therefore is not tread-safe.
+ */
+public class HadoopIgfsReader implements IgfsReader {
+ /** Secondary file system. */
+ private final FileSystem fs;
+
+ /** Path to the file to open. */
+ private final Path path;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Actual input stream. */
+ private FSDataInputStream in;
+
+ /** Cached error occurred during output stream open. */
+ private IOException err;
+
+ /** Flag indicating that the stream was already opened. */
+ private boolean opened;
+
+ /**
+ * Constructor.
+ *
+ * @param fs Secondary file system.
+ * @param path Path to the file to open.
+ * @param bufSize Buffer size.
+ */
+ public HadoopIgfsReader(FileSystem fs, Path path, int bufSize) {
+ assert fs != null;
+ assert path != null;
+
+ this.fs = fs;
+ this.path = path;
+ this.bufSize = bufSize;
+ }
+
+ /** Get input stream. */
+ private PositionedReadable in() throws IOException {
+ if (opened) {
+ if (err != null)
+ throw err;
+ }
+ else {
+ opened = true;
+
+ try {
+ in = fs.open(path, bufSize);
+
+ if (in == null)
+ throw new IOException("Failed to open input stream (file system returned null): " + path);
+ }
+ catch (IOException e) {
+ err = e;
+
+ throw err;
+ }
+ }
+
+ return in;
+ }
+
+ /**
+ * Close wrapped input stream in case it was previously opened.
+ */
+ @Override public void close() {
+ U.closeQuiet(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+ return in().read(pos, buf, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
new file mode 100644
index 0000000..54f7377
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * IGFS Hadoop stream descriptor.
+ */
+public class HadoopIgfsStreamDelegate {
+ /** RPC handler. */
+ private final HadoopIgfsEx hadoop;
+
+ /** Target. */
+ private final Object target;
+
+ /** Optional stream length. */
+ private final long len;
+
+ /**
+ * Constructor.
+ *
+ * @param target Target.
+ */
+ public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
+ this(hadoop, target, -1);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param target Target.
+ * @param len Optional length.
+ */
+ public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
+ assert hadoop != null;
+ assert target != null;
+
+ this.hadoop = hadoop;
+ this.target = target;
+ this.len = len;
+ }
+
+ /**
+ * @return RPC handler.
+ */
+ public HadoopIgfsEx hadoop() {
+ return hadoop;
+ }
+
+ /**
+ * @return Stream target.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T target() {
+ return (T) target;
+ }
+
+ /**
+ * @return Length.
+ */
+ public long length() {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return System.identityHashCode(target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
+ target == ((HadoopIgfsStreamDelegate)obj).target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopIgfsStreamDelegate.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
new file mode 100644
index 0000000..6b3fa82
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+
+/**
+ * IGFS input stream event listener.
+ */
+public interface HadoopIgfsStreamEventListener {
+ /**
+ * Callback invoked when the stream is being closed.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onClose() throws IgniteCheckedException;
+
+ /**
+ * Callback invoked when remote error occurs.
+ *
+ * @param errMsg Error message.
+ */
+ public void onError(String errMsg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
new file mode 100644
index 0000000..e30a4ec
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Utility constants and methods for IGFS Hadoop file system.
+ */
+public class HadoopIgfsUtils {
+ /** Parameter name for endpoint no embed mode flag. */
+ public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
+
+ /** Parameter name for endpoint no shared memory flag. */
+ public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
+
+ /** Parameter name for endpoint no local TCP flag. */
+ public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
+
+ /**
+ * Get string parameter.
+ *
+ * @param cfg Configuration.
+ * @param name Parameter name.
+ * @param authority Authority.
+ * @param dflt Default value.
+ * @return String value.
+ */
+ public static String parameter(Configuration cfg, String name, String authority, String dflt) {
+ return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
+ }
+
+ /**
+ * Get integer parameter.
+ *
+ * @param cfg Configuration.
+ * @param name Parameter name.
+ * @param authority Authority.
+ * @param dflt Default value.
+ * @return Integer value.
+ * @throws IOException In case of parse exception.
+ */
+ public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
+ String name0 = String.format(name, authority != null ? authority : "");
+
+ try {
+ return cfg.getInt(name0, dflt);
+ }
+ catch (NumberFormatException ignore) {
+ throw new IOException("Failed to parse parameter value to integer: " + name0);
+ }
+ }
+
+ /**
+ * Get boolean parameter.
+ *
+ * @param cfg Configuration.
+ * @param name Parameter name.
+ * @param authority Authority.
+ * @param dflt Default value.
+ * @return Boolean value.
+ */
+ public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
+ return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
+ }
+
+ /**
+ * Cast Ignite exception to appropriate IO exception.
+ *
+ * @param e Exception to cast.
+ * @return Casted exception.
+ */
+ public static IOException cast(IgniteCheckedException e) {
+ return cast(e, null);
+ }
+
+ /**
+ * Cast Ignite exception to appropriate IO exception.
+ *
+ * @param e Exception to cast.
+ * @param path Path for exceptions.
+ * @return Casted exception.
+ */
+ @SuppressWarnings("unchecked")
+ public static IOException cast(IgniteCheckedException e, @Nullable String path) {
+ assert e != null;
+
+ // First check for any nested IOException; if exists - re-throw it.
+ if (e.hasCause(IOException.class))
+ return e.getCause(IOException.class);
+ else if (e.hasCause(IgfsFileNotFoundException.class))
+ return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
+ else if (e.hasCause(IgfsParentNotDirectoryException.class))
+ return new ParentNotDirectoryException(path);
+ else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
+ return new PathIsNotEmptyDirectoryException(path);
+ else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
+ return new PathExistsException(path);
+ else
+ return new IOException(e);
+ }
+
+ /**
+ * Constructor.
+ */
+ private HadoopIgfsUtils() {
+ // No-op.
+ }
+}