You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 15:26:30 UTC
[40/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
deleted file mode 100644
index 014e2a1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended IGFS server interface.
- */
-public interface HadoopIgfsEx extends HadoopIgfs {
- /**
- * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
- * If connection is closed already, callback will be invoked synchronously inside this method.
- *
- * @param delegate Stream delegate.
- * @param lsnr Event listener.
- */
- public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr);
-
- /**
- * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
- *
- * @param delegate Stream delegate.
- */
- public void removeEventListener(HadoopIgfsStreamDelegate delegate);
-
- /**
- * Asynchronously reads specified amount of bytes from opened input stream.
- *
- * @param delegate Stream delegate.
- * @param pos Position to read from.
- * @param len Data length to read.
- * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
- * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
- * be the result of read future.
- * @param outOff Output offset.
- * @param outLen Output length.
- * @return Read data.
- */
- public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
- @Nullable final byte[] outBuf, final int outOff, final int outLen);
-
- /**
- * Writes data to the stream with given streamId. This method does not return any future since
- * no response to write request is sent.
- *
- * @param delegate Stream delegate.
- * @param data Data to write.
- * @param off Offset.
- * @param len Length.
- * @throws IOException If failed.
- */
- public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
-
- /**
- * Close server stream.
- *
- * @param delegate Stream delegate.
- * @throws IOException If failed.
- */
- public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException;
-
- /**
- * Flush output stream.
- *
- * @param delegate Stream delegate.
- * @throws IOException If failed.
- */
- public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
-
- /**
- * The user this Igfs instance works on behalf of.
- * @return the user name.
- */
- public String user();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
deleted file mode 100644
index 5ff1b2e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * IGFS client future that holds response parse closure.
- */
-public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Output buffer. */
- private byte[] outBuf;
-
- /** Output offset. */
- private int outOff;
-
- /** Output length. */
- private int outLen;
-
- /** Read future flag. */
- private boolean read;
-
- /**
- * @return Output buffer.
- */
- public byte[] outputBuffer() {
- return outBuf;
- }
-
- /**
- * @param outBuf Output buffer.
- */
- public void outputBuffer(@Nullable byte[] outBuf) {
- this.outBuf = outBuf;
- }
-
- /**
- * @return Offset in output buffer to write from.
- */
- public int outputOffset() {
- return outOff;
- }
-
- /**
- * @param outOff Offset in output buffer to write from.
- */
- public void outputOffset(int outOff) {
- this.outOff = outOff;
- }
-
- /**
- * @return Length to write to output buffer.
- */
- public int outputLength() {
- return outLen;
- }
-
- /**
- * @param outLen Length to write to output buffer.
- */
- public void outputLength(int outLen) {
- this.outLen = outLen;
- }
-
- /**
- * @param read {@code True} if this is a read future.
- */
- public void read(boolean read) {
- this.read = read;
- }
-
- /**
- * @return {@code True} if this is a read future.
- */
- public boolean read() {
- return read;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
deleted file mode 100644
index 3220538..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsInputStream;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Communication with grid in the same process.
- */
-public class HadoopIgfsInProc implements HadoopIgfsEx {
- /** Target IGFS. */
- private final IgfsEx igfs;
-
- /** Buffer size. */
- private final int bufSize;
-
- /** Event listeners. */
- private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> lsnrs =
- new ConcurrentHashMap<>();
-
- /** Logger. */
- private final Log log;
-
- /** The user this Igfs works on behalf of. */
- private final String user;
-
- /**
- * Constructor.
- *
- * @param igfs Target IGFS.
- * @param log Log.
- */
- public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
- this.user = IgfsUtils.fixUserName(userName);
-
- this.igfs = igfs;
-
- this.log = log;
-
- bufSize = igfs.configuration().getBlockSize() * 2;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(final String logDir) {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
- @Override public IgfsHandshakeResponse apply() {
- igfs.clientLogDirectory(logDir);
-
- return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
- igfs.globalSampling());
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public void close(boolean force) {
- // Perform cleanup.
- for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
- try {
- lsnr.onClose();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to notify stream event listener", e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
- @Override public IgfsFile apply() {
- return igfs.info(path);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
- @Override public IgfsFile apply() {
- return igfs.update(path, props);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
- try {
- IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
- @Override public Void apply() {
- igfs.setTimes(path, accessTime, modificationTime);
-
- return null;
- }
- });
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
- try {
- IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
- @Override public Void apply() {
- igfs.rename(src, dest);
-
- return null;
- }
- });
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
- @Override public Boolean apply() {
- return igfs.delete(path, recursive);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
- @Override public IgfsStatus call() throws IgniteCheckedException {
- return igfs.globalSpace();
- }
- });
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
- "stopping.");
- }
- catch (IgniteCheckedException | RuntimeException | Error e) {
- throw e;
- }
- catch (Exception e) {
- throw new AssertionError("Must never go there.");
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
- @Override public Collection<IgfsPath> apply() {
- return igfs.listPaths(path);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
- @Override public Collection<IgfsFile> apply() {
- return igfs.listFiles(path);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
- try {
- IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
- @Override public Void apply() {
- igfs.mkdirs(path, props);
-
- return null;
- }
- });
-
- return true;
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
- @Override public IgfsPathSummary apply() {
- return igfs.summary(path);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " +
- path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
- throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
- @Override public Collection<IgfsBlockLocation> apply() {
- return igfs.affinity(path, start, len);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply() {
- IgfsInputStream stream = igfs.open(path, bufSize);
-
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
- throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply() {
- IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
-
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
- final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply() {
- IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
- colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
-
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
- final @Nullable Map<String, String> props) throws IgniteCheckedException {
- try {
- return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
- @Override public HadoopIgfsStreamDelegate apply() {
- IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
-
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
- }
- });
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IllegalStateException e) {
- throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
- @Nullable byte[] outBuf, int outOff, int outLen) {
- IgfsInputStream stream = delegate.target();
-
- try {
- byte[] res = null;
-
- if (outBuf != null) {
- int outTailLen = outBuf.length - outOff;
-
- if (len <= outTailLen)
- stream.readFully(pos, outBuf, outOff, len);
- else {
- stream.readFully(pos, outBuf, outOff, outTailLen);
-
- int remainderLen = len - outTailLen;
-
- res = new byte[remainderLen];
-
- stream.readFully(pos, res, 0, remainderLen);
- }
- } else {
- res = new byte[len];
-
- stream.readFully(pos, res, 0, len);
- }
-
- return new GridFinishedFuture<>(res);
- }
- catch (IllegalStateException | IOException e) {
- HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- return new GridFinishedFuture<>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len)
- throws IOException {
- try {
- IgfsOutputStream stream = delegate.target();
-
- stream.write(data, off, len);
- }
- catch (IllegalStateException | IOException e) {
- HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- if (e instanceof IllegalStateException)
- throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e);
- else
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
- try {
- IgfsOutputStream stream = delegate.target();
-
- stream.flush();
- }
- catch (IllegalStateException | IOException e) {
- HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
- if (lsnr != null)
- lsnr.onError(e.getMessage());
-
- if (e instanceof IllegalStateException)
- throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e);
- else
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
- Closeable closeable = desc.target();
-
- try {
- closeable.close();
- }
- catch (IllegalStateException e) {
- throw new IOException("Failed to close IGFS stream because Grid is stopping.", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void addEventListener(HadoopIgfsStreamDelegate delegate,
- HadoopIgfsStreamEventListener lsnr) {
- HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
-
- assert lsnr0 == null || lsnr0 == lsnr;
-
- if (log.isDebugEnabled())
- log.debug("Added stream event listener [delegate=" + delegate + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) {
- HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate);
-
- if (lsnr0 != null && log.isDebugEnabled())
- log.debug("Removed stream event listener [delegate=" + delegate + ']');
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- return user;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
deleted file mode 100644
index 46b46d7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * IGFS input stream wrapper for hadoop interfaces.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable,
- HadoopIgfsStreamEventListener {
- /** Minimum buffer size. */
- private static final int MIN_BUF_SIZE = 4 * 1024;
-
- /** Server stream delegate. */
- private HadoopIgfsStreamDelegate delegate;
-
- /** Stream ID used by logger. */
- private long logStreamId;
-
- /** Stream position. */
- private long pos;
-
- /** Stream read limit. */
- private long limit;
-
- /** Mark position. */
- private long markPos = -1;
-
- /** Prefetch buffer. */
- private DoubleFetchBuffer buf = new DoubleFetchBuffer();
-
- /** Buffer half size for double-buffering. */
- private int bufHalfSize;
-
- /** Closed flag. */
- private volatile boolean closed;
-
- /** Flag set if stream was closed due to connection breakage. */
- private boolean connBroken;
-
- /** Logger. */
- private Log log;
-
- /** Client logger. */
- private IgfsLogger clientLog;
-
- /** Read time. */
- private long readTime;
-
- /** User time. */
- private long userTime;
-
- /** Last timestamp. */
- private long lastTs;
-
- /** Amount of read bytes. */
- private long total;
-
- /**
- * Creates input stream.
- *
- * @param delegate Server stream delegate.
- * @param limit Read limit.
- * @param bufSize Buffer size.
- * @param log Log.
- * @param clientLog Client logger.
- */
- public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log,
- IgfsLogger clientLog, long logStreamId) {
- assert limit >= 0;
-
- this.delegate = delegate;
- this.limit = limit;
- this.log = log;
- this.clientLog = clientLog;
- this.logStreamId = logStreamId;
-
- bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
-
- lastTs = System.nanoTime();
-
- delegate.hadoop().addEventListener(delegate, this);
- }
-
- /**
- * Read start.
- */
- private void readStart() {
- long now = System.nanoTime();
-
- userTime += now - lastTs;
-
- lastTs = now;
- }
-
- /**
- * Read end.
- */
- private void readEnd() {
- long now = System.nanoTime();
-
- readTime += now - lastTs;
-
- lastTs = now;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read() throws IOException {
- checkClosed();
-
- readStart();
-
- try {
- if (eof())
- return -1;
-
- buf.refreshAhead(pos);
-
- int res = buf.atPosition(pos);
-
- pos++;
- total++;
-
- buf.refreshAhead(pos);
-
- return res;
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
- checkClosed();
-
- if (eof())
- return -1;
-
- readStart();
-
- try {
- long remaining = limit - pos;
-
- int read = buf.flatten(b, pos, off, len);
-
- pos += read;
- total += read;
- remaining -= read;
-
- if (remaining > 0 && read != len) {
- int readAmt = (int)Math.min(remaining, len - read);
-
- delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get();
-
- read += readAmt;
- pos += readAmt;
- total += readAmt;
- }
-
- buf.refreshAhead(pos);
-
- return read;
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long skip(long n) throws IOException {
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logSkip(logStreamId, n);
-
- long oldPos = pos;
-
- if (pos + n <= limit)
- pos += n;
- else
- pos = limit;
-
- buf.refreshAhead(pos);
-
- return pos - oldPos;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int available() throws IOException {
- checkClosed();
-
- int available = buf.available(pos);
-
- assert available >= 0;
-
- return available;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- if (!closed) {
- readStart();
-
- if (log.isDebugEnabled())
- log.debug("Closing input stream: " + delegate);
-
- delegate.hadoop().closeStream(delegate);
-
- readEnd();
-
- if (clientLog.isLogEnabled())
- clientLog.logCloseIn(logStreamId, userTime, readTime, total);
-
- markClosed(false);
-
- if (log.isDebugEnabled())
- log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime +
- ", userTime=" + userTime + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void mark(int readLimit) {
- markPos = pos;
-
- if (clientLog.isLogEnabled())
- clientLog.logMark(logStreamId, readLimit);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void reset() throws IOException {
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logReset(logStreamId);
-
- if (markPos == -1)
- throw new IOException("Stream was not marked.");
-
- pos = markPos;
-
- buf.refreshAhead(pos);
- }
-
- /** {@inheritDoc} */
- @Override public boolean markSupported() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException {
- long remaining = limit - position;
-
- int read = (int)Math.min(len, remaining);
-
- // Return -1 at EOF.
- if (read == 0)
- return -1;
-
- readFully(position, buf, off, read);
-
- return read;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException {
- long remaining = limit - position;
-
- checkClosed();
-
- if (len > remaining)
- throw new EOFException("End of stream reached before data was fully read.");
-
- readStart();
-
- try {
- int read = this.buf.flatten(buf, position, off, len);
-
- total += read;
-
- if (read != len) {
- int readAmt = len - read;
-
- delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get();
-
- total += readAmt;
- }
-
- if (clientLog.isLogEnabled())
- clientLog.logRandomRead(logStreamId, position, len);
- }
- catch (IgniteCheckedException e) {
- throw HadoopIgfsUtils.cast(e);
- }
- finally {
- readEnd();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long position, byte[] buf) throws IOException {
- readFully(position, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void seek(long pos) throws IOException {
- A.ensure(pos >= 0, "position must be non-negative");
-
- checkClosed();
-
- if (clientLog.isLogEnabled())
- clientLog.logSeek(logStreamId, pos);
-
- if (pos > limit)
- pos = limit;
-
- if (log.isDebugEnabled())
- log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']');
-
- this.pos = pos;
-
- buf.refreshAhead(pos);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long getPos() {
- return pos;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized boolean seekToNewSource(long targetPos) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void onClose() {
- markClosed(true);
- }
-
- /** {@inheritDoc} */
- @Override public void onError(String errMsg) {
- // No-op.
- }
-
- /**
- * Marks stream as closed.
- *
- * @param connBroken {@code True} if connection with server was lost.
- */
- private void markClosed(boolean connBroken) {
- // It is ok to have race here.
- if (!closed) {
- closed = true;
-
- this.connBroken = connBroken;
-
- delegate.hadoop().removeEventListener(delegate);
- }
- }
-
- /**
- * @throws IOException If check failed.
- */
- private void checkClosed() throws IOException {
- if (closed) {
- if (connBroken)
- throw new IOException("Server connection was lost.");
- else
- throw new IOException("Stream is closed.");
- }
- }
-
- /**
- * @return {@code True} if end of stream reached.
- */
- private boolean eof() {
- return limit == pos;
- }
-
- /**
- * Asynchronous prefetch buffer.
- */
- private static class FetchBufferPart {
- /** Read future. */
- private IgniteInternalFuture<byte[]> readFut;
-
- /** Position of cached chunk in file. */
- private long pos;
-
- /** Prefetch length. Need to store as read future result might be not available yet. */
- private int len;
-
- /**
- * Creates fetch buffer part.
- *
- * @param readFut Read future for this buffer.
- * @param pos Read position.
- * @param len Chunk length.
- */
- private FetchBufferPart(IgniteInternalFuture<byte[]> readFut, long pos, int len) {
- this.readFut = readFut;
- this.pos = pos;
- this.len = len;
- }
-
- /**
- * Copies cached data if specified position matches cached region.
- *
- * @param dst Destination buffer.
- * @param pos Read position in file.
- * @param dstOff Offset in destination buffer from which start writing.
- * @param len Maximum number of bytes to copy.
- * @return Number of bytes copied.
- * @throws IgniteCheckedException If read future failed.
- */
- public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
- // If read start position is within cached boundaries.
- if (contains(pos)) {
- byte[] data = readFut.get();
-
- int srcPos = (int)(pos - this.pos);
- int cpLen = Math.min(len, data.length - srcPos);
-
- U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
-
- return cpLen;
- }
-
- return 0;
- }
-
- /**
- * @return {@code True} if data is ready to be read.
- */
- public boolean ready() {
- return readFut.isDone();
- }
-
- /**
- * Checks if current buffer part contains given position.
- *
- * @param pos Position to check.
- * @return {@code True} if position matches buffer region.
- */
- public boolean contains(long pos) {
- return this.pos <= pos && this.pos + len > pos;
- }
- }
-
- private class DoubleFetchBuffer {
- /** */
- private FetchBufferPart first;
-
- /** */
- private FetchBufferPart second;
-
- /**
- * Copies fetched data from both buffers to destination array if cached region matched read position.
- *
- * @param dst Destination buffer.
- * @param pos Read position in file.
- * @param dstOff Destination buffer offset.
- * @param len Maximum number of bytes to copy.
- * @return Number of bytes copied.
- * @throws IgniteCheckedException If any read operation failed.
- */
- public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
- assert dstOff >= 0;
- assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff +
- ", len=" + len + ']';
-
- int bytesCopied = 0;
-
- if (first != null) {
- bytesCopied += first.flatten(dst, pos, dstOff, len);
-
- if (bytesCopied != len && second != null) {
- assert second.pos == first.pos + first.len;
-
- bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied);
- }
- }
-
- return bytesCopied;
- }
-
- /**
- * Gets byte at specified position in buffer.
- *
- * @param pos Stream position.
- * @return Read byte.
- * @throws IgniteCheckedException If read failed.
- */
- public int atPosition(long pos) throws IgniteCheckedException {
- // Should not reach here if stream contains no data.
- assert first != null;
-
- if (first.contains(pos)) {
- byte[] bytes = first.readFut.get();
-
- return bytes[((int)(pos - first.pos))] & 0xFF;
- }
- else {
- assert second != null;
- assert second.contains(pos);
-
- byte[] bytes = second.readFut.get();
-
- return bytes[((int)(pos - second.pos))] & 0xFF;
- }
- }
-
- /**
- * Starts asynchronous buffer refresh if needed, depending on current position.
- *
- * @param pos Current stream position.
- */
- public void refreshAhead(long pos) {
- if (fullPrefetch(pos)) {
- first = fetch(pos, bufHalfSize);
- second = fetch(pos + bufHalfSize, bufHalfSize);
- }
- else if (needFlip(pos)) {
- first = second;
-
- second = fetch(first.pos + first.len, bufHalfSize);
- }
- }
-
- /**
- * @param pos Position from which read is expected.
- * @return Number of bytes available to be read without blocking.
- */
- public int available(long pos) {
- int available = 0;
-
- if (first != null) {
- if (first.contains(pos)) {
- if (first.ready()) {
- available += (pos - first.pos);
-
- if (second != null && second.ready())
- available += second.len;
- }
- }
- else {
- if (second != null && second.contains(pos) && second.ready())
- available += (pos - second.pos);
- }
- }
-
- return available;
- }
-
- /**
- * Checks if position shifted enough to forget previous buffer.
- *
- * @param pos Current position.
- * @return {@code True} if need flip buffers.
- */
- private boolean needFlip(long pos) {
- // Return true if we read more then half of second buffer.
- return second != null && second.contains(pos);
- }
-
- /**
- * Determines if all cached bytes should be discarded and new region should be
- * prefetched.
- *
- * @param curPos Current stream position.
- * @return {@code True} if need to refresh both blocks.
- */
- private boolean fullPrefetch(long curPos) {
- // If no data was prefetched yet, return true.
- return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len);
- }
-
- /**
- * Starts asynchronous fetch for given region.
- *
- * @param pos Position to read from.
- * @param size Number of bytes to read.
- * @return Fetch buffer part.
- */
- private FetchBufferPart fetch(long pos, int size) {
- long remaining = limit - pos;
-
- size = (int)Math.min(size, remaining);
-
- return size <= 0 ? null :
- new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java
deleted file mode 100644
index 70f645f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.igfs.common.IgfsMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response
- * and request without response.
- */
-public interface HadoopIgfsIo {
- /**
- * Sends given IGFS client message and asynchronously awaits for response.
- *
- * @param msg Message to send.
- * @return Future that will be completed.
- * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
- */
- public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException;
-
- /**
- * Sends given IGFS client message and asynchronously awaits for response. When IO detects response
- * beginning for given message it stops reading data and passes input stream to closure which can read
- * response in a specific way.
- *
- * @param msg Message to send.
- * @param outBuf Output buffer. If {@code null}, the output buffer is not used.
- * @param outOff Output buffer offset.
- * @param outLen Output buffer length.
- * @return Future that will be completed when response is returned from closure.
- * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed).
- */
- public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen)
- throws IgniteCheckedException;
-
- /**
- * Sends given message and does not wait for response.
- *
- * @param msg Message to send.
- * @throws IgniteCheckedException If send failed.
- */
- public void sendPlain(IgfsMessage msg) throws IgniteCheckedException;
-
- /**
- * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
- * If connection is closed already, callback will be invoked synchronously inside this method.
- *
- * @param lsnr Event listener.
- */
- public void addEventListener(HadoopIgfsIpcIoListener lsnr);
-
- /**
- * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
- *
- * @param lsnr Event listener.
- */
- public void removeEventListener(HadoopIgfsIpcIoListener lsnr);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
deleted file mode 100644
index b0a4135..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.BufferedOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsException;
-import org.apache.ignite.internal.GridLoggerProxy;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
-import org.apache.ignite.internal.igfs.common.IgfsDataInputStream;
-import org.apache.ignite.internal.igfs.common.IgfsDataOutputStream;
-import org.apache.ignite.internal.igfs.common.IgfsIpcCommand;
-import org.apache.ignite.internal.igfs.common.IgfsMarshaller;
-import org.apache.ignite.internal.igfs.common.IgfsMessage;
-import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridStripedLock;
-import org.apache.ignite.internal.util.ipc.IpcEndpoint;
-import org.apache.ignite.internal.util.ipc.IpcEndpointFactory;
-import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * IO layer implementation based on blocking IPC streams.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public class HadoopIgfsIpcIo implements HadoopIgfsIo {
- /** Logger. */
- private final Log log;
-
- /** Request futures map. */
- private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
- new ConcurrentHashMap8<>();
-
- /** Request ID counter. */
- private AtomicLong reqIdCnt = new AtomicLong();
-
- /** Endpoint. */
- private IpcEndpoint endpoint;
-
- /** Endpoint output stream. */
- private IgfsDataOutputStream out;
-
- /** Protocol. */
- private final IgfsMarshaller marsh;
-
- /** Client reader thread. */
- private Thread reader;
-
- /** Lock for graceful shutdown. */
- private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
-
- /** Stopping flag. */
- private volatile boolean stopping;
-
- /** Server endpoint address. */
- private final String endpointAddr;
-
- /** Number of open file system sessions. */
- private final AtomicInteger activeCnt = new AtomicInteger(1);
-
- /** Event listeners. */
- private final Collection<HadoopIgfsIpcIoListener> lsnrs =
- new GridConcurrentHashSet<>();
-
- /** Cached connections. */
- private static final ConcurrentMap<String, HadoopIgfsIpcIo> ipcCache =
- new ConcurrentHashMap8<>();
-
- /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */
- private static final GridStripedLock initLock = new GridStripedLock(32);
-
- /**
- * @param endpointAddr Endpoint.
- * @param marsh Protocol.
- * @param log Logger to use.
- */
- public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) {
- assert endpointAddr != null;
- assert marsh != null;
-
- this.endpointAddr = endpointAddr;
- this.marsh = marsh;
- this.log = log;
- }
-
- /**
- * Returns a started and valid instance of this class
- * for a given endpoint.
- *
- * @param log Logger to use for new instance.
- * @param endpoint Endpoint string.
- * @return New or existing cached instance, which is started and operational.
- * @throws IOException If new instance was created but failed to start.
- */
- public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException {
- while (true) {
- HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint);
-
- if (clientIo != null) {
- if (clientIo.acquire())
- return clientIo;
- else
- // If concurrent close.
- ipcCache.remove(endpoint, clientIo);
- }
- else {
- Lock lock = initLock.getLock(endpoint);
-
- lock.lock();
-
- try {
- clientIo = ipcCache.get(endpoint);
-
- if (clientIo != null) { // Perform double check.
- if (clientIo.acquire())
- return clientIo;
- else
- // If concurrent close.
- ipcCache.remove(endpoint, clientIo);
- }
-
- // Otherwise try creating a new one.
- clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log);
-
- try {
- clientIo.start();
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e.getMessage(), e);
- }
-
- HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo);
-
- // Put in exclusive lock.
- assert old == null;
-
- return clientIo;
- }
- finally {
- lock.unlock();
- }
- }
- }
- }
-
- /**
- * Increases usage count for this instance.
- *
- * @return {@code true} if usage count is greater than zero.
- */
- private boolean acquire() {
- while (true) {
- int cnt = activeCnt.get();
-
- if (cnt == 0) {
- if (log.isDebugEnabled())
- log.debug("IPC IO not acquired (count was 0): " + this);
-
- return false;
- }
-
- // Need to make sure that no-one decremented count in between.
- if (activeCnt.compareAndSet(cnt, cnt + 1)) {
- if (log.isDebugEnabled())
- log.debug("IPC IO acquired: " + this);
-
- return true;
- }
- }
- }
-
- /**
- * Releases this instance, decrementing usage count.
- * <p>
- * If usage count becomes zero, the instance is stopped
- * and removed from cache.
- */
- public void release() {
- while (true) {
- int cnt = activeCnt.get();
-
- if (cnt == 0) {
- if (log.isDebugEnabled())
- log.debug("IPC IO not released (count was 0): " + this);
-
- return;
- }
-
- if (activeCnt.compareAndSet(cnt, cnt - 1)) {
- if (cnt == 1) {
- ipcCache.remove(endpointAddr, this);
-
- if (log.isDebugEnabled())
- log.debug("IPC IO stopping as unused: " + this);
-
- stop();
- }
- else if (log.isDebugEnabled())
- log.debug("IPC IO released: " + this);
-
- return;
- }
- }
- }
-
- /**
- * Closes this IO instance, removing it from cache.
- */
- public void forceClose() {
- if (ipcCache.remove(endpointAddr, this))
- stop();
- }
-
- /**
- * Starts the IO.
- *
- * @throws IgniteCheckedException If failed to connect the endpoint.
- */
- private void start() throws IgniteCheckedException {
- boolean success = false;
-
- try {
- endpoint = IpcEndpointFactory.connectEndpoint(
- endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, ""));
-
- out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
-
- reader = new ReaderThread();
-
- // Required for Hadoop 2.x
- reader.setDaemon(true);
-
- reader.start();
-
- success = true;
- }
- catch (IgniteCheckedException e) {
- IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class);
-
- if (resEx != null)
- throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx);
-
- throw e;
- }
- finally {
- if (!success)
- stop();
- }
- }
-
- /**
- * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed.
- * Close listeners will be invoked as if connection is closed by server.
- */
- private void stop() {
- close0(null);
-
- if (reader != null) {
- try {
- U.interrupt(reader);
- U.join(reader);
-
- reader = null;
- }
- catch (IgniteInterruptedCheckedException ignored) {
- Thread.currentThread().interrupt();
-
- log.warn("Got interrupted while waiting for reader thread to shut down (will return).");
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) {
- if (!busyLock.readLock().tryLock()) {
- lsnr.onClose();
-
- return;
- }
-
- boolean invokeNow = false;
-
- try {
- invokeNow = stopping;
-
- if (!invokeNow)
- lsnrs.add(lsnr);
- }
- finally {
- busyLock.readLock().unlock();
-
- if (invokeNow)
- lsnr.onClose();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) {
- lsnrs.remove(lsnr);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException {
- return send(msg, null, 0, 0);
- }
-
- /** {@inheritDoc} */
- @Override public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff,
- int outLen) throws IgniteCheckedException {
- assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK;
-
- if (!busyLock.readLock().tryLock())
- throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
- "closed).");
-
- try {
- if (stopping)
- throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
- "closed).");
-
- long reqId = reqIdCnt.getAndIncrement();
-
- HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>();
-
- fut.outputBuffer(outBuf);
- fut.outputOffset(outOff);
- fut.outputLength(outLen);
- fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK);
-
- HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut);
-
- assert oldFut == null;
-
- if (log.isDebugEnabled())
- log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']');
-
- byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command());
-
- IgniteCheckedException err = null;
-
- try {
- synchronized (this) {
- marsh.marshall(msg, hdr, out);
-
- out.flush(); // Blocking operation + sometimes system call.
- }
- }
- catch (IgniteCheckedException e) {
- err = e;
- }
- catch (IOException e) {
- err = new HadoopIgfsCommunicationException(e);
- }
-
- if (err != null) {
- reqMap.remove(reqId, fut);
-
- fut.onDone(err);
- }
-
- return fut;
- }
- finally {
- busyLock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException {
- if (!busyLock.readLock().tryLock())
- throw new HadoopIgfsCommunicationException("Failed to send message (client is being " +
- "concurrently closed).");
-
- try {
- if (stopping)
- throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed).");
-
- assert msg.command() == IgfsIpcCommand.WRITE_BLOCK;
-
- IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
-
- byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK);
-
- U.longToBytes(req.streamId(), hdr, 12);
- U.intToBytes(req.length(), hdr, 20);
-
- synchronized (this) {
- out.write(hdr);
- out.write(req.data(), (int)req.position(), req.length());
-
- out.flush();
- }
- }
- catch (IOException e) {
- throw new HadoopIgfsCommunicationException(e);
- }
- finally {
- busyLock.readLock().unlock();
- }
- }
-
- /**
- * Closes client but does not wait.
- *
- * @param err Error.
- */
- private void close0(@Nullable Throwable err) {
- busyLock.writeLock().lock();
-
- try {
- if (stopping)
- return;
-
- stopping = true;
- }
- finally {
- busyLock.writeLock().unlock();
- }
-
- if (err == null)
- err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " +
- "is received).");
-
- // Clean up resources.
- U.closeQuiet(out);
-
- if (endpoint != null)
- endpoint.close();
-
- // Unwind futures. We can safely iterate here because no more futures will be added.
- Iterator<HadoopIgfsFuture> it = reqMap.values().iterator();
-
- while (it.hasNext()) {
- HadoopIgfsFuture fut = it.next();
-
- fut.onDone(err);
-
- it.remove();
- }
-
- for (HadoopIgfsIpcIoListener lsnr : lsnrs)
- lsnr.onClose();
- }
-
- /**
- * Do not extend {@code GridThread} to minimize class dependencies.
- */
- private class ReaderThread extends Thread {
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void run() {
- // Error to fail pending futures.
- Throwable err = null;
-
- try {
- InputStream in = endpoint.inputStream();
-
- IgfsDataInputStream dis = new IgfsDataInputStream(in);
-
- byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
- byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE];
-
- while (!Thread.currentThread().isInterrupted()) {
- dis.readFully(hdr);
-
- long reqId = U.bytesToLong(hdr, 0);
-
- // We don't wait for write responses, therefore reqId is -1.
- if (reqId == -1) {
- // We received a response which normally should not be sent. It must contain an error.
- dis.readFully(msgHdr);
-
- assert msgHdr[4] != 0;
-
- String errMsg = dis.readUTF();
-
- // Error code.
- dis.readInt();
-
- long streamId = dis.readLong();
-
- for (HadoopIgfsIpcIoListener lsnr : lsnrs)
- lsnr.onError(streamId, errMsg);
- }
- else {
- HadoopIgfsFuture<Object> fut = reqMap.remove(reqId);
-
- if (fut == null) {
- String msg = "Failed to read response from server: response closure is unavailable for " +
- "requestId (will close connection):" + reqId;
-
- log.warn(msg);
-
- err = new IgniteCheckedException(msg);
-
- break;
- }
- else {
- try {
- IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8));
-
- if (log.isDebugEnabled())
- log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']');
-
- Object res = null;
-
- if (fut.read()) {
- dis.readFully(msgHdr);
-
- boolean hasErr = msgHdr[4] != 0;
-
- if (hasErr) {
- String errMsg = dis.readUTF();
-
- // Error code.
- Integer errCode = dis.readInt();
-
- IgfsControlResponse.throwError(errCode, errMsg);
- }
-
- int blockLen = U.bytesToInt(msgHdr, 5);
-
- int readLen = Math.min(blockLen, fut.outputLength());
-
- if (readLen > 0) {
- assert fut.outputBuffer() != null;
-
- dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen);
- }
-
- if (readLen != blockLen) {
- byte[] buf = new byte[blockLen - readLen];
-
- dis.readFully(buf);
-
- res = buf;
- }
- }
- else
- res = marsh.unmarshall(cmd, hdr, dis);
-
- fut.onDone(res);
- }
- catch (IgfsException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to apply response closure (will fail request future): " +
- e.getMessage());
-
- fut.onDone(e);
-
- err = e;
- }
- catch (Throwable t) {
- fut.onDone(t);
-
- throw t;
- }
- }
- }
- }
- }
- catch (EOFException ignored) {
- err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer).");
- }
- catch (IOException e) {
- if (!stopping)
- log.error("Failed to read data (connection will be closed)", e);
-
- err = new HadoopIgfsCommunicationException(e);
- }
- catch (Throwable e) {
- if (!stopping)
- log.error("Failed to obtain endpoint input stream (connection will be closed)", e);
-
- err = e;
-
- if (e instanceof Error)
- throw (Error)e;
- }
- finally {
- close0(err);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt +
- ", stopping=" + stopping + ']';
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
deleted file mode 100644
index c26e896..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-/**
- * Listens to the events of {@link HadoopIgfsIpcIo}.
- */
-public interface HadoopIgfsIpcIoListener {
- /**
- * Callback invoked when the IO is being closed.
- */
- public void onClose();
-
- /**
- * Callback invoked when remote error occurs.
- *
- * @param streamId Stream ID.
- * @param errMsg Error message.
- */
- public void onError(long streamId, String errMsg);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
deleted file mode 100644
index 3a7f45b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * JCL logger wrapper for Hadoop.
- */
-public class HadoopIgfsJclLogger implements IgniteLogger {
- /** JCL implementation proxy. */
- @GridToStringInclude
- private Log impl;
-
- /**
- * Constructor.
- *
- * @param impl JCL implementation to use.
- */
- HadoopIgfsJclLogger(Log impl) {
- assert impl != null;
-
- this.impl = impl;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger getLogger(Object ctgr) {
- return new HadoopIgfsJclLogger(LogFactory.getLog(
- ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr)));
- }
-
- /** {@inheritDoc} */
- @Override public void trace(String msg) {
- impl.trace(msg);
- }
-
- /** {@inheritDoc} */
- @Override public void debug(String msg) {
- impl.debug(msg);
- }
-
- /** {@inheritDoc} */
- @Override public void info(String msg) {
- impl.info(msg);
- }
-
- /** {@inheritDoc} */
- @Override public void warning(String msg) {
- impl.warn(msg);
- }
-
- /** {@inheritDoc} */
- @Override public void warning(String msg, @Nullable Throwable e) {
- impl.warn(msg, e);
- }
-
- /** {@inheritDoc} */
- @Override public void error(String msg) {
- impl.error(msg);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isQuiet() {
- return !isInfoEnabled() && !isDebugEnabled();
- }
-
- /** {@inheritDoc} */
- @Override public void error(String msg, @Nullable Throwable e) {
- impl.error(msg, e);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isTraceEnabled() {
- return impl.isTraceEnabled();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDebugEnabled() {
- return impl.isDebugEnabled();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isInfoEnabled() {
- return impl.isInfoEnabled();
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public String fileName() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopIgfsJclLogger.class, this);
- }
-}
\ No newline at end of file