You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:21 UTC
[05/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules
into common/network-*
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
deleted file mode 100644
index 431cb67..0000000
--- a/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.sasl;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.base64.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A SASL Server for Spark which simply keeps track of the state of a single SASL session, from the
- * initial state to the "authenticated" state. (It is not a server in the sense of accepting
- * connections on some socket.)
- */
-public class SparkSaslServer implements SaslEncryptionBackend {
- private final Logger logger = LoggerFactory.getLogger(SparkSaslServer.class);
-
- /**
- * This is passed as the server name when creating the sasl client/server.
- * This could be changed to be configurable in the future.
- */
- static final String DEFAULT_REALM = "default";
-
- /**
- * The authentication mechanism used here is DIGEST-MD5. This could be changed to be
- * configurable in the future.
- */
- static final String DIGEST = "DIGEST-MD5";
-
- /**
- * Quality of protection value that includes encryption.
- */
- static final String QOP_AUTH_CONF = "auth-conf";
-
- /**
- * Quality of protection value that does not include encryption.
- */
- static final String QOP_AUTH = "auth";
-
- /** Identifier for a certain secret key within the secretKeyHolder. */
- private final String secretKeyId;
- private final SecretKeyHolder secretKeyHolder;
- private SaslServer saslServer;
-
- public SparkSaslServer(
- String secretKeyId,
- SecretKeyHolder secretKeyHolder,
- boolean alwaysEncrypt) {
- this.secretKeyId = secretKeyId;
- this.secretKeyHolder = secretKeyHolder;
-
- // Sasl.QOP is a comma-separated list of supported values. The value that allows encryption
- // is listed first since it's preferred over the non-encrypted one (if the client also
- // lists both in the request).
- String qop = alwaysEncrypt ? QOP_AUTH_CONF : String.format("%s,%s", QOP_AUTH_CONF, QOP_AUTH);
- Map<String, String> saslProps = ImmutableMap.<String, String>builder()
- .put(Sasl.SERVER_AUTH, "true")
- .put(Sasl.QOP, qop)
- .build();
- try {
- this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM, saslProps,
- new DigestCallbackHandler());
- } catch (SaslException e) {
- throw Throwables.propagate(e);
- }
- }
-
- /**
- * Determines whether the authentication exchange has completed successfully.
- */
- public synchronized boolean isComplete() {
- return saslServer != null && saslServer.isComplete();
- }
-
- /** Returns the value of a negotiated property. */
- public Object getNegotiatedProperty(String name) {
- return saslServer.getNegotiatedProperty(name);
- }
-
- /**
- * Used to respond to server SASL tokens.
- * @param token Server's SASL token
- * @return response to send back to the server.
- */
- public synchronized byte[] response(byte[] token) {
- try {
- return saslServer != null ? saslServer.evaluateResponse(token) : new byte[0];
- } catch (SaslException e) {
- throw Throwables.propagate(e);
- }
- }
-
- /**
- * Disposes of any system resources or security-sensitive information the
- * SaslServer might be using.
- */
- @Override
- public synchronized void dispose() {
- if (saslServer != null) {
- try {
- saslServer.dispose();
- } catch (SaslException e) {
- // ignore
- } finally {
- saslServer = null;
- }
- }
- }
-
- @Override
- public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
- return saslServer.wrap(data, offset, len);
- }
-
- @Override
- public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
- return saslServer.unwrap(data, offset, len);
- }
-
- /**
- * Implementation of javax.security.auth.callback.CallbackHandler for SASL DIGEST-MD5 mechanism.
- */
- private class DigestCallbackHandler implements CallbackHandler {
- @Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- logger.trace("SASL server callback: setting username");
- NameCallback nc = (NameCallback) callback;
- nc.setName(encodeIdentifier(secretKeyHolder.getSaslUser(secretKeyId)));
- } else if (callback instanceof PasswordCallback) {
- logger.trace("SASL server callback: setting password");
- PasswordCallback pc = (PasswordCallback) callback;
- pc.setPassword(encodePassword(secretKeyHolder.getSecretKey(secretKeyId)));
- } else if (callback instanceof RealmCallback) {
- logger.trace("SASL server callback: setting realm");
- RealmCallback rc = (RealmCallback) callback;
- rc.setText(rc.getDefaultText());
- } else if (callback instanceof AuthorizeCallback) {
- AuthorizeCallback ac = (AuthorizeCallback) callback;
- String authId = ac.getAuthenticationID();
- String authzId = ac.getAuthorizationID();
- ac.setAuthorized(authId.equals(authzId));
- if (ac.isAuthorized()) {
- ac.setAuthorizedID(authzId);
- }
- logger.debug("SASL Authorization complete, authorized set to {}", ac.isAuthorized());
- } else {
- throw new UnsupportedCallbackException(callback, "Unrecognized SASL DIGEST-MD5 Callback");
- }
- }
- }
- }
-
- /* Encode a byte[] identifier as a Base64-encoded string. */
- public static String encodeIdentifier(String identifier) {
- Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
- return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
- .toString(Charsets.UTF_8);
- }
-
- /** Encode a password as a base64-encoded char[] array. */
- public static char[] encodePassword(String password) {
- Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
- return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
- .toString(Charsets.UTF_8).toCharArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java b/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
deleted file mode 100644
index 4a1f28e..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import org.apache.spark.network.protocol.Message;
-
-/**
- * Handles either request or response messages coming off of Netty. A MessageHandler instance
- * is associated with a single Netty Channel (though it may have multiple clients on the same
- * Channel.)
- */
-public abstract class MessageHandler<T extends Message> {
- /** Handles the receipt of a single message. */
- public abstract void handle(T message) throws Exception;
-
- /** Invoked when the channel this MessageHandler is on is active. */
- public abstract void channelActive();
-
- /** Invoked when an exception was caught on the Channel. */
- public abstract void exceptionCaught(Throwable cause);
-
- /** Invoked when the channel this MessageHandler is on is inactive. */
- public abstract void channelInactive();
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
deleted file mode 100644
index 6ed61da..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-
-/** An RpcHandler suitable for a client-only TransportContext, which cannot receive RPCs. */
-public class NoOpRpcHandler extends RpcHandler {
- private final StreamManager streamManager;
-
- public NoOpRpcHandler() {
- streamManager = new OneForOneStreamManager();
- }
-
- @Override
- public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
- throw new UnsupportedOperationException("Cannot handle messages");
- }
-
- @Override
- public StreamManager getStreamManager() { return streamManager; }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
deleted file mode 100644
index ea9e735..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually
- * fetched as chunks by the client. Each registered buffer is one chunk.
- */
-public class OneForOneStreamManager extends StreamManager {
- private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
-
- private final AtomicLong nextStreamId;
- private final ConcurrentHashMap<Long, StreamState> streams;
-
- /** State of a single stream. */
- private static class StreamState {
- final String appId;
- final Iterator<ManagedBuffer> buffers;
-
- // The channel associated to the stream
- Channel associatedChannel = null;
-
- // Used to keep track of the index of the buffer that the user has retrieved, just to ensure
- // that the caller only requests each chunk one at a time, in order.
- int curChunk = 0;
-
- StreamState(String appId, Iterator<ManagedBuffer> buffers) {
- this.appId = appId;
- this.buffers = Preconditions.checkNotNull(buffers);
- }
- }
-
- public OneForOneStreamManager() {
- // For debugging purposes, start with a random stream id to help identifying different streams.
- // This does not need to be globally unique, only unique to this class.
- nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
- streams = new ConcurrentHashMap<Long, StreamState>();
- }
-
- @Override
- public void registerChannel(Channel channel, long streamId) {
- if (streams.containsKey(streamId)) {
- streams.get(streamId).associatedChannel = channel;
- }
- }
-
- @Override
- public ManagedBuffer getChunk(long streamId, int chunkIndex) {
- StreamState state = streams.get(streamId);
- if (chunkIndex != state.curChunk) {
- throw new IllegalStateException(String.format(
- "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
- } else if (!state.buffers.hasNext()) {
- throw new IllegalStateException(String.format(
- "Requested chunk index beyond end %s", chunkIndex));
- }
- state.curChunk += 1;
- ManagedBuffer nextChunk = state.buffers.next();
-
- if (!state.buffers.hasNext()) {
- logger.trace("Removing stream id {}", streamId);
- streams.remove(streamId);
- }
-
- return nextChunk;
- }
-
- @Override
- public void connectionTerminated(Channel channel) {
- // Close all streams which have been associated with the channel.
- for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
- StreamState state = entry.getValue();
- if (state.associatedChannel == channel) {
- streams.remove(entry.getKey());
-
- // Release all remaining buffers.
- while (state.buffers.hasNext()) {
- state.buffers.next().release();
- }
- }
- }
- }
-
- @Override
- public void checkAuthorization(TransportClient client, long streamId) {
- if (client.getClientId() != null) {
- StreamState state = streams.get(streamId);
- Preconditions.checkArgument(state != null, "Unknown stream ID.");
- if (!client.getClientId().equals(state.appId)) {
- throw new SecurityException(String.format(
- "Client %s not authorized to read stream %d (app %s).",
- client.getClientId(),
- streamId,
- state.appId));
- }
- }
- }
-
- /**
- * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
- * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
- * client connection is closed before the iterator is fully drained, then the remaining buffers
- * will all be release()'d.
- *
- * If an app ID is provided, only callers who've authenticated with the given app ID will be
- * allowed to fetch from this stream.
- */
- public long registerStream(String appId, Iterator<ManagedBuffer> buffers) {
- long myStreamId = nextStreamId.getAndIncrement();
- streams.put(myStreamId, new StreamState(appId, buffers));
- return myStreamId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
deleted file mode 100644
index a99c301..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.
- */
-public abstract class RpcHandler {
-
- private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
-
- /**
- * Receive a single RPC message. Any exception thrown while in this method will be sent back to
- * the client in string form as a standard RPC failure.
- *
- * This method will not be called in parallel for a single TransportClient (i.e., channel).
- *
- * @param client A channel client which enables the handler to make requests back to the sender
- * of this RPC. This will always be the exact same object for a particular channel.
- * @param message The serialized bytes of the RPC.
- * @param callback Callback which should be invoked exactly once upon success or failure of the
- * RPC.
- */
- public abstract void receive(
- TransportClient client,
- ByteBuffer message,
- RpcResponseCallback callback);
-
- /**
- * Returns the StreamManager which contains the state about which streams are currently being
- * fetched by a TransportClient.
- */
- public abstract StreamManager getStreamManager();
-
- /**
- * Receives an RPC message that does not expect a reply. The default implementation will
- * call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
- * any of the callback methods are called.
- *
- * @param client A channel client which enables the handler to make requests back to the sender
- * of this RPC. This will always be the exact same object for a particular channel.
- * @param message The serialized bytes of the RPC.
- */
- public void receive(TransportClient client, ByteBuffer message) {
- receive(client, message, ONE_WAY_CALLBACK);
- }
-
- /**
- * Invoked when the channel associated with the given client is active.
- */
- public void channelActive(TransportClient client) { }
-
- /**
- * Invoked when the channel associated with the given client is inactive.
- * No further requests will come from this client.
- */
- public void channelInactive(TransportClient client) { }
-
- public void exceptionCaught(Throwable cause, TransportClient client) { }
-
- private static class OneWayRpcCallback implements RpcResponseCallback {
-
- private final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
-
- @Override
- public void onSuccess(ByteBuffer response) {
- logger.warn("Response provided for one-way RPC.");
- }
-
- @Override
- public void onFailure(Throwable e) {
- logger.error("Error response provided for one-way RPC.", e);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
deleted file mode 100644
index 07f161a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.Channel;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * The StreamManager is used to fetch individual chunks from a stream. This is used in
- * {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the
- * stream is outside the scope of the transport layer, but a given stream is guaranteed to be read
- * by only one client connection, meaning that getChunk() for a particular stream will be called
- * serially and that once the connection associated with the stream is closed, that stream will
- * never be used again.
- */
-public abstract class StreamManager {
- /**
- * Called in response to a fetchChunk() request. The returned buffer will be passed as-is to the
- * client. A single stream will be associated with a single TCP connection, so this method
- * will not be called in parallel for a particular stream.
- *
- * Chunks may be requested in any order, and requests may be repeated, but it is not required
- * that implementations support this behavior.
- *
- * The returned ManagedBuffer will be release()'d after being written to the network.
- *
- * @param streamId id of a stream that has been previously registered with the StreamManager.
- * @param chunkIndex 0-indexed chunk of the stream that's requested
- */
- public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
-
- /**
- * Called in response to a stream() request. The returned data is streamed to the client
- * through a single TCP connection.
- *
- * Note the <code>streamId</code> argument is not related to the similarly named argument in the
- * {@link #getChunk(long, int)} method.
- *
- * @param streamId id of a stream that has been previously registered with the StreamManager.
- * @return A managed buffer for the stream, or null if the stream was not found.
- */
- public ManagedBuffer openStream(String streamId) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Associates a stream with a single client connection, which is guaranteed to be the only reader
- * of the stream. The getChunk() method will be called serially on this connection and once the
- * connection is closed, the stream will never be used again, enabling cleanup.
- *
- * This must be called before the first getChunk() on the stream, but it may be invoked multiple
- * times with the same channel and stream id.
- */
- public void registerChannel(Channel channel, long streamId) { }
-
- /**
- * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
- * to read from the associated streams again, so any state can be cleaned up.
- */
- public void connectionTerminated(Channel channel) { }
-
- /**
- * Verify that the client is authorized to read from the given stream.
- *
- * @throws SecurityException If client is not authorized.
- */
- public void checkAuthorization(TransportClient client, long streamId) { }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
deleted file mode 100644
index 18a9b78..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportResponseHandler;
-import org.apache.spark.network.protocol.Message;
-import org.apache.spark.network.protocol.RequestMessage;
-import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.util.NettyUtils;
-
-/**
- * The single Transport-level Channel handler which is used for delegating requests to the
- * {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
- *
- * All channels created in the transport layer are bidirectional. When the Client initiates a Netty
- * Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
- * will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
- * also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
- * Client.
- * This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
- * for the Client's responses to the Server's requests.
- *
- * This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
- * We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
- * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
- * timeout if the client is continuously sending but getting no responses, for simplicity.
- */
-public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
- private final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
-
- private final TransportClient client;
- private final TransportResponseHandler responseHandler;
- private final TransportRequestHandler requestHandler;
- private final long requestTimeoutNs;
- private final boolean closeIdleConnections;
-
- public TransportChannelHandler(
- TransportClient client,
- TransportResponseHandler responseHandler,
- TransportRequestHandler requestHandler,
- long requestTimeoutMs,
- boolean closeIdleConnections) {
- this.client = client;
- this.responseHandler = responseHandler;
- this.requestHandler = requestHandler;
- this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
- this.closeIdleConnections = closeIdleConnections;
- }
-
- public TransportClient getClient() {
- return client;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
- cause);
- requestHandler.exceptionCaught(cause);
- responseHandler.exceptionCaught(cause);
- ctx.close();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- try {
- requestHandler.channelActive();
- } catch (RuntimeException e) {
- logger.error("Exception from request handler while registering channel", e);
- }
- try {
- responseHandler.channelActive();
- } catch (RuntimeException e) {
- logger.error("Exception from response handler while registering channel", e);
- }
- super.channelRegistered(ctx);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- try {
- requestHandler.channelInactive();
- } catch (RuntimeException e) {
- logger.error("Exception from request handler while unregistering channel", e);
- }
- try {
- responseHandler.channelInactive();
- } catch (RuntimeException e) {
- logger.error("Exception from response handler while unregistering channel", e);
- }
- super.channelUnregistered(ctx);
- }
-
- @Override
- public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
- if (request instanceof RequestMessage) {
- requestHandler.handle((RequestMessage) request);
- } else {
- responseHandler.handle((ResponseMessage) request);
- }
- }
-
- /** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent e = (IdleStateEvent) evt;
- // See class comment for timeout semantics. In addition to ensuring we only timeout while
- // there are outstanding requests, we also do a secondary consistency check to ensure
- // there's no race between the idle timeout and incrementing the numOutstandingRequests
- // (see SPARK-7003).
- //
- // To avoid a race between TransportClientFactory.createClient() and this code which could
- // result in an inactive client being returned, this needs to run in a synchronized block.
- synchronized (this) {
- boolean isActuallyOverdue =
- System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
- if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
- if (responseHandler.numOutstandingRequests() > 0) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
- logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
- "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
- "is wrong.", address, requestTimeoutNs / 1000 / 1000);
- client.timeOut();
- ctx.close();
- } else if (closeIdleConnections) {
- // While CloseIdleConnections is enable, we also close idle connection
- client.timeOut();
- ctx.close();
- }
- }
- }
- }
- ctx.fireUserEventTriggered(evt);
- }
-
- public TransportResponseHandler getResponseHandler() {
- return responseHandler;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
deleted file mode 100644
index 296ced3..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.protocol.ChunkFetchRequest;
-import org.apache.spark.network.protocol.ChunkFetchFailure;
-import org.apache.spark.network.protocol.ChunkFetchSuccess;
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.protocol.OneWayMessage;
-import org.apache.spark.network.protocol.RequestMessage;
-import org.apache.spark.network.protocol.RpcFailure;
-import org.apache.spark.network.protocol.RpcRequest;
-import org.apache.spark.network.protocol.RpcResponse;
-import org.apache.spark.network.protocol.StreamFailure;
-import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.util.NettyUtils;
-
-/**
- * A handler that processes requests from clients and writes chunk data back. Each handler is
- * attached to a single Netty channel, and keeps track of which streams have been fetched via this
- * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
- *
- * The messages should have been processed by the pipeline setup by {@link TransportServer}.
- */
-public class TransportRequestHandler extends MessageHandler<RequestMessage> {
- private final Logger logger = LoggerFactory.getLogger(TransportRequestHandler.class);
-
- /** The Netty channel that this handler is associated with. */
- private final Channel channel;
-
- /** Client on the same channel allowing us to talk back to the requester. */
- private final TransportClient reverseClient;
-
- /** Handles all RPC messages. */
- private final RpcHandler rpcHandler;
-
- /** Returns each chunk part of a stream. */
- private final StreamManager streamManager;
-
- public TransportRequestHandler(
- Channel channel,
- TransportClient reverseClient,
- RpcHandler rpcHandler) {
- this.channel = channel;
- this.reverseClient = reverseClient;
- this.rpcHandler = rpcHandler;
- this.streamManager = rpcHandler.getStreamManager();
- }
-
- @Override
- public void exceptionCaught(Throwable cause) {
- rpcHandler.exceptionCaught(cause, reverseClient);
- }
-
- @Override
- public void channelActive() {
- rpcHandler.channelActive(reverseClient);
- }
-
- @Override
- public void channelInactive() {
- if (streamManager != null) {
- try {
- streamManager.connectionTerminated(channel);
- } catch (RuntimeException e) {
- logger.error("StreamManager connectionTerminated() callback failed.", e);
- }
- }
- rpcHandler.channelInactive(reverseClient);
- }
-
- @Override
- public void handle(RequestMessage request) {
- if (request instanceof ChunkFetchRequest) {
- processFetchRequest((ChunkFetchRequest) request);
- } else if (request instanceof RpcRequest) {
- processRpcRequest((RpcRequest) request);
- } else if (request instanceof OneWayMessage) {
- processOneWayMessage((OneWayMessage) request);
- } else if (request instanceof StreamRequest) {
- processStreamRequest((StreamRequest) request);
- } else {
- throw new IllegalArgumentException("Unknown request type: " + request);
- }
- }
-
- private void processFetchRequest(final ChunkFetchRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
-
- logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
-
- ManagedBuffer buf;
- try {
- streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
- streamManager.registerChannel(channel, req.streamChunkId.streamId);
- buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
- } catch (Exception e) {
- logger.error(String.format(
- "Error opening block %s for request from %s", req.streamChunkId, client), e);
- respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
- return;
- }
-
- respond(new ChunkFetchSuccess(req.streamChunkId, buf));
- }
-
- private void processStreamRequest(final StreamRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
- ManagedBuffer buf;
- try {
- buf = streamManager.openStream(req.streamId);
- } catch (Exception e) {
- logger.error(String.format(
- "Error opening stream %s for request from %s", req.streamId, client), e);
- respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
- return;
- }
-
- if (buf != null) {
- respond(new StreamResponse(req.streamId, buf.size(), buf));
- } else {
- respond(new StreamFailure(req.streamId, String.format(
- "Stream '%s' was not found.", req.streamId)));
- }
- }
-
- private void processRpcRequest(final RpcRequest req) {
- try {
- rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
- @Override
- public void onSuccess(ByteBuffer response) {
- respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
- }
-
- @Override
- public void onFailure(Throwable e) {
- respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
- }
- });
- } catch (Exception e) {
- logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
- respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
- } finally {
- req.body().release();
- }
- }
-
- private void processOneWayMessage(OneWayMessage req) {
- try {
- rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
- } catch (Exception e) {
- logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
- } finally {
- req.body().release();
- }
- }
-
- /**
- * Responds to a single message with some Encodable object. If a failure occurs while sending,
- * it will be logged and the channel closed.
- */
- private void respond(final Encodable result) {
- final String remoteAddress = channel.remoteAddress().toString();
- channel.writeAndFlush(result).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
- } else {
- logger.error(String.format("Error sending result %s to %s; closing connection",
- result, remoteAddress), future.cause());
- channel.close();
- }
- }
- }
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
deleted file mode 100644
index baae235..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import org.apache.spark.network.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.util.IOMode;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * Server for the efficient, low-level streaming service.
- */
-public class TransportServer implements Closeable {
- private final Logger logger = LoggerFactory.getLogger(TransportServer.class);
-
- private final TransportContext context;
- private final TransportConf conf;
- private final RpcHandler appRpcHandler;
- private final List<TransportServerBootstrap> bootstraps;
-
- private ServerBootstrap bootstrap;
- private ChannelFuture channelFuture;
- private int port = -1;
-
- /**
- * Creates a TransportServer that binds to the given host and the given port, or to any available
- * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
- * */
- public TransportServer(
- TransportContext context,
- String hostToBind,
- int portToBind,
- RpcHandler appRpcHandler,
- List<TransportServerBootstrap> bootstraps) {
- this.context = context;
- this.conf = context.getConf();
- this.appRpcHandler = appRpcHandler;
- this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
-
- try {
- init(hostToBind, portToBind);
- } catch (RuntimeException e) {
- JavaUtils.closeQuietly(this);
- throw e;
- }
- }
-
- public int getPort() {
- if (port == -1) {
- throw new IllegalStateException("Server not initialized");
- }
- return port;
- }
-
- private void init(String hostToBind, int portToBind) {
-
- IOMode ioMode = IOMode.valueOf(conf.ioMode());
- EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
- EventLoopGroup workerGroup = bossGroup;
-
- PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
-
- bootstrap = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NettyUtils.getServerChannelClass(ioMode))
- .option(ChannelOption.ALLOCATOR, allocator)
- .childOption(ChannelOption.ALLOCATOR, allocator);
-
- if (conf.backLog() > 0) {
- bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
- }
-
- if (conf.receiveBuf() > 0) {
- bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
- }
-
- if (conf.sendBuf() > 0) {
- bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
- }
-
- bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- RpcHandler rpcHandler = appRpcHandler;
- for (TransportServerBootstrap bootstrap : bootstraps) {
- rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
- }
- context.initializePipeline(ch, rpcHandler);
- }
- });
-
- InetSocketAddress address = hostToBind == null ?
- new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
- channelFuture = bootstrap.bind(address);
- channelFuture.syncUninterruptibly();
-
- port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
- logger.debug("Shuffle server started on port :" + port);
- }
-
- @Override
- public void close() {
- if (channelFuture != null) {
- // close is a local operation and should finish within milliseconds; timeout just to be safe
- channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- channelFuture = null;
- }
- if (bootstrap != null && bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully();
- }
- if (bootstrap != null && bootstrap.childGroup() != null) {
- bootstrap.childGroup().shutdownGracefully();
- }
- bootstrap = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
deleted file mode 100644
index 05803ab..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.Channel;
-
-/**
- * A bootstrap which is executed on a TransportServer's client channel once a client connects
- * to the server. This allows customizing the client channel to allow for things such as SASL
- * authentication.
- */
-public interface TransportServerBootstrap {
- /**
- * Customizes the channel to include new features, if needed.
- *
- * @param channel The connected channel opened by the client.
- * @param rpcHandler The RPC handler for the server.
- * @return The RPC handler to use for the channel.
- */
- RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java b/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
deleted file mode 100644
index b141572..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * A writable channel that stores the written data in a byte array in memory.
- */
-public class ByteArrayWritableChannel implements WritableByteChannel {
-
- private final byte[] data;
- private int offset;
-
- public ByteArrayWritableChannel(int size) {
- this.data = new byte[size];
- }
-
- public byte[] getData() {
- return data;
- }
-
- public int length() {
- return offset;
- }
-
- /** Resets the channel so that writing to it will overwrite the existing buffer. */
- public void reset() {
- offset = 0;
- }
-
- /**
- * Reads from the given buffer into the internal byte array.
- */
- @Override
- public int write(ByteBuffer src) {
- int toTransfer = Math.min(src.remaining(), data.length - offset);
- src.get(data, offset, toTransfer);
- offset += toTransfer;
- return toTransfer;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public boolean isOpen() {
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
deleted file mode 100644
index a2f0183..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.network.util;
-
-public enum ByteUnit {
- BYTE (1),
- KiB (1024L),
- MiB ((long) Math.pow(1024L, 2L)),
- GiB ((long) Math.pow(1024L, 3L)),
- TiB ((long) Math.pow(1024L, 4L)),
- PiB ((long) Math.pow(1024L, 5L));
-
- private ByteUnit(long multiplier) {
- this.multiplier = multiplier;
- }
-
- // Interpret the provided number (d) with suffix (u) as this unit type.
- // E.g. KiB.interpret(1, MiB) interprets 1MiB as its KiB representation = 1024k
- public long convertFrom(long d, ByteUnit u) {
- return u.convertTo(d, this);
- }
-
- // Convert the provided number (d) interpreted as this unit type to unit type (u).
- public long convertTo(long d, ByteUnit u) {
- if (multiplier > u.multiplier) {
- long ratio = multiplier / u.multiplier;
- if (Long.MAX_VALUE / ratio < d) {
- throw new IllegalArgumentException("Conversion of " + d + " exceeds Long.MAX_VALUE in "
- + name() + ". Try a larger unit (e.g. MiB instead of KiB)");
- }
- return d * ratio;
- } else {
- // Perform operations in this order to avoid potential overflow
- // when computing d * multiplier
- return d / (u.multiplier / multiplier);
- }
- }
-
- public double toBytes(long d) {
- if (d < 0) {
- throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
- }
- return d * multiplier;
- }
-
- public long toKiB(long d) { return convertTo(d, KiB); }
- public long toMiB(long d) { return convertTo(d, MiB); }
- public long toGiB(long d) { return convertTo(d, GiB); }
- public long toTiB(long d) { return convertTo(d, TiB); }
- public long toPiB(long d) { return convertTo(d, PiB); }
-
- private final long multiplier;
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
deleted file mode 100644
index d944d9d..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.util.NoSuchElementException;
-
-/**
- * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration.
- */
-public abstract class ConfigProvider {
- /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */
- public abstract String get(String name);
-
- public String get(String name, String defaultValue) {
- try {
- return get(name);
- } catch (NoSuchElementException e) {
- return defaultValue;
- }
- }
-
- public int getInt(String name, int defaultValue) {
- return Integer.parseInt(get(name, Integer.toString(defaultValue)));
- }
-
- public long getLong(String name, long defaultValue) {
- return Long.parseLong(get(name, Long.toString(defaultValue)));
- }
-
- public double getDouble(String name, double defaultValue) {
- return Double.parseDouble(get(name, Double.toString(defaultValue)));
- }
-
- public boolean getBoolean(String name, boolean defaultValue) {
- return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/IOMode.java b/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
deleted file mode 100644
index 6b208d9..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-/**
- * Selector for which form of low-level IO we should use.
- * NIO is always available, while EPOLL is only available on Linux.
- * AUTO is used to select EPOLL if it's available, or NIO otherwise.
- */
-public enum IOMode {
- NIO, EPOLL
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
deleted file mode 100644
index b3d8e0c..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.Unpooled;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * General utilities available in the network package. Many of these are sourced from Spark's
- * own Utils, just accessible within this package.
- */
-public class JavaUtils {
- private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
-
- /**
- * Define a default value for driver memory here since this value is referenced across the code
- * base and nearly all files already use Utils.scala
- */
- public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
- /** Closes the given object, ignoring IOExceptions. */
- public static void closeQuietly(Closeable closeable) {
- try {
- if (closeable != null) {
- closeable.close();
- }
- } catch (IOException e) {
- logger.error("IOException should not have been thrown.", e);
- }
- }
-
- /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
- public static int nonNegativeHash(Object obj) {
- if (obj == null) { return 0; }
- int hash = obj.hashCode();
- return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
- }
-
- /**
- * Convert the given string to a byte buffer. The resulting buffer can be
- * converted back to the same string through {@link #bytesToString(ByteBuffer)}.
- */
- public static ByteBuffer stringToBytes(String s) {
- return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
- }
-
- /**
- * Convert the given byte buffer to a string. The resulting string can be
- * converted back to the same byte buffer through {@link #stringToBytes(String)}.
- */
- public static String bytesToString(ByteBuffer b) {
- return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
- }
-
- /*
- * Delete a file or directory and its contents recursively.
- * Don't follow directories if they are symlinks.
- * Throws an exception if deletion is unsuccessful.
- */
- public static void deleteRecursively(File file) throws IOException {
- if (file == null) { return; }
-
- if (file.isDirectory() && !isSymlink(file)) {
- IOException savedIOException = null;
- for (File child : listFilesSafely(file)) {
- try {
- deleteRecursively(child);
- } catch (IOException e) {
- // In case of multiple exceptions, only last one will be thrown
- savedIOException = e;
- }
- }
- if (savedIOException != null) {
- throw savedIOException;
- }
- }
-
- boolean deleted = file.delete();
- // Delete can also fail if the file simply did not exist.
- if (!deleted && file.exists()) {
- throw new IOException("Failed to delete: " + file.getAbsolutePath());
- }
- }
-
- private static File[] listFilesSafely(File file) throws IOException {
- if (file.exists()) {
- File[] files = file.listFiles();
- if (files == null) {
- throw new IOException("Failed to list files for dir: " + file);
- }
- return files;
- } else {
- return new File[0];
- }
- }
-
- private static boolean isSymlink(File file) throws IOException {
- Preconditions.checkNotNull(file);
- File fileInCanonicalDir = null;
- if (file.getParent() == null) {
- fileInCanonicalDir = file;
- } else {
- fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
- }
- return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
- }
-
- private static final ImmutableMap<String, TimeUnit> timeSuffixes =
- ImmutableMap.<String, TimeUnit>builder()
- .put("us", TimeUnit.MICROSECONDS)
- .put("ms", TimeUnit.MILLISECONDS)
- .put("s", TimeUnit.SECONDS)
- .put("m", TimeUnit.MINUTES)
- .put("min", TimeUnit.MINUTES)
- .put("h", TimeUnit.HOURS)
- .put("d", TimeUnit.DAYS)
- .build();
-
- private static final ImmutableMap<String, ByteUnit> byteSuffixes =
- ImmutableMap.<String, ByteUnit>builder()
- .put("b", ByteUnit.BYTE)
- .put("k", ByteUnit.KiB)
- .put("kb", ByteUnit.KiB)
- .put("m", ByteUnit.MiB)
- .put("mb", ByteUnit.MiB)
- .put("g", ByteUnit.GiB)
- .put("gb", ByteUnit.GiB)
- .put("t", ByteUnit.TiB)
- .put("tb", ByteUnit.TiB)
- .put("p", ByteUnit.PiB)
- .put("pb", ByteUnit.PiB)
- .build();
-
- /**
- * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
- * internal use. If no suffix is provided a direct conversion is attempted.
- */
- private static long parseTimeString(String str, TimeUnit unit) {
- String lower = str.toLowerCase().trim();
-
- try {
- Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
- if (!m.matches()) {
- throw new NumberFormatException("Failed to parse time string: " + str);
- }
-
- long val = Long.parseLong(m.group(1));
- String suffix = m.group(2);
-
- // Check for invalid suffixes
- if (suffix != null && !timeSuffixes.containsKey(suffix)) {
- throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
- }
-
- // If suffix is valid use that, otherwise none was provided and use the default passed
- return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
- } catch (NumberFormatException e) {
- String timeError = "Time must be specified as seconds (s), " +
- "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +
- "E.g. 50s, 100ms, or 250us.";
-
- throw new NumberFormatException(timeError + "\n" + e.getMessage());
- }
- }
-
- /**
- * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
- * no suffix is provided, the passed number is assumed to be in ms.
- */
- public static long timeStringAsMs(String str) {
- return parseTimeString(str, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
- * no suffix is provided, the passed number is assumed to be in seconds.
- */
- public static long timeStringAsSec(String str) {
- return parseTimeString(str, TimeUnit.SECONDS);
- }
-
- /**
- * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
- * internal use. If no suffix is provided a direct conversion of the provided default is
- * attempted.
- */
- private static long parseByteString(String str, ByteUnit unit) {
- String lower = str.toLowerCase().trim();
-
- try {
- Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
- Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
-
- if (m.matches()) {
- long val = Long.parseLong(m.group(1));
- String suffix = m.group(2);
-
- // Check for invalid suffixes
- if (suffix != null && !byteSuffixes.containsKey(suffix)) {
- throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
- }
-
- // If suffix is valid use that, otherwise none was provided and use the default passed
- return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);
- } else if (fractionMatcher.matches()) {
- throw new NumberFormatException("Fractional values are not supported. Input was: "
- + fractionMatcher.group(1));
- } else {
- throw new NumberFormatException("Failed to parse byte string: " + str);
- }
-
- } catch (NumberFormatException e) {
- String timeError = "Size must be specified as bytes (b), " +
- "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " +
- "E.g. 50b, 100k, or 250m.";
-
- throw new NumberFormatException(timeError + "\n" + e.getMessage());
- }
- }
-
- /**
- * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for
- * internal use.
- *
- * If no suffix is provided, the passed number is assumed to be in bytes.
- */
- public static long byteStringAsBytes(String str) {
- return parseByteString(str, ByteUnit.BYTE);
- }
-
- /**
- * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for
- * internal use.
- *
- * If no suffix is provided, the passed number is assumed to be in kibibytes.
- */
- public static long byteStringAsKb(String str) {
- return parseByteString(str, ByteUnit.KiB);
- }
-
- /**
- * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for
- * internal use.
- *
- * If no suffix is provided, the passed number is assumed to be in mebibytes.
- */
- public static long byteStringAsMb(String str) {
- return parseByteString(str, ByteUnit.MiB);
- }
-
- /**
- * Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for
- * internal use.
- *
- * If no suffix is provided, the passed number is assumed to be in gibibytes.
- */
- public static long byteStringAsGb(String str) {
- return parseByteString(str, ByteUnit.GiB);
- }
-
- /**
- * Returns a byte array with the buffer's contents, trying to avoid copying the data if
- * possible.
- */
- public static byte[] bufferToArray(ByteBuffer buffer) {
- if (buffer.hasArray() && buffer.arrayOffset() == 0 &&
- buffer.array().length == buffer.remaining()) {
- return buffer.array();
- } else {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- return bytes;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
deleted file mode 100644
index 922c37a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Based on LimitedInputStream.java from Google Guava
- *
- * Copyright (C) 2007 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Wraps a {@link InputStream}, limiting the number of bytes which can be read.
- *
- * This code is from Guava's 14.0 source code, because there is no compatible way to
- * use this functionality in both a Guava 11 environment and a Guava >14 environment.
- */
-public final class LimitedInputStream extends FilterInputStream {
- private long left;
- private long mark = -1;
-
- public LimitedInputStream(InputStream in, long limit) {
- super(in);
- Preconditions.checkNotNull(in);
- Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
- left = limit;
- }
- @Override public int available() throws IOException {
- return (int) Math.min(in.available(), left);
- }
- // it's okay to mark even if mark isn't supported, as reset won't work
- @Override public synchronized void mark(int readLimit) {
- in.mark(readLimit);
- mark = left;
- }
- @Override public int read() throws IOException {
- if (left == 0) {
- return -1;
- }
- int result = in.read();
- if (result != -1) {
- --left;
- }
- return result;
- }
- @Override public int read(byte[] b, int off, int len) throws IOException {
- if (left == 0) {
- return -1;
- }
- len = (int) Math.min(len, left);
- int result = in.read(b, off, len);
- if (result != -1) {
- left -= result;
- }
- return result;
- }
- @Override public synchronized void reset() throws IOException {
- if (!in.markSupported()) {
- throw new IOException("Mark not supported");
- }
- if (mark == -1) {
- throw new IOException("Mark not set");
- }
- in.reset();
- left = mark;
- }
- @Override public long skip(long n) throws IOException {
- n = Math.min(n, left);
- long skipped = in.skip(n);
- left -= skipped;
- return skipped;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
deleted file mode 100644
index 668d235..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/** ConfigProvider based on a Map (copied in the constructor). */
-public class MapConfigProvider extends ConfigProvider {
- private final Map<String, String> config;
-
- public MapConfigProvider(Map<String, String> config) {
- this.config = Maps.newHashMap(config);
- }
-
- @Override
- public String get(String name) {
- String value = config.get(name);
- if (value == null) {
- throw new NoSuchElementException(name);
- }
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
deleted file mode 100644
index caa7260..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.ThreadFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
- */
-public class NettyUtils {
- /** Creates a new ThreadFactory which prefixes each thread with the given name. */
- public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
- return new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(threadPoolPrefix + "-%d")
- .build();
- }
-
- /** Creates a Netty EventLoopGroup based on the IOMode. */
- public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
- ThreadFactory threadFactory = createThreadFactory(threadPrefix);
-
- switch (mode) {
- case NIO:
- return new NioEventLoopGroup(numThreads, threadFactory);
- case EPOLL:
- return new EpollEventLoopGroup(numThreads, threadFactory);
- default:
- throw new IllegalArgumentException("Unknown io mode: " + mode);
- }
- }
-
- /** Returns the correct (client) SocketChannel class based on IOMode. */
- public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
- switch (mode) {
- case NIO:
- return NioSocketChannel.class;
- case EPOLL:
- return EpollSocketChannel.class;
- default:
- throw new IllegalArgumentException("Unknown io mode: " + mode);
- }
- }
-
- /** Returns the correct ServerSocketChannel class based on IOMode. */
- public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
- switch (mode) {
- case NIO:
- return NioServerSocketChannel.class;
- case EPOLL:
- return EpollServerSocketChannel.class;
- default:
- throw new IllegalArgumentException("Unknown io mode: " + mode);
- }
- }
-
- /**
- * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame.
- * This is used before all decoders.
- */
- public static TransportFrameDecoder createFrameDecoder() {
- return new TransportFrameDecoder();
- }
-
- /** Returns the remote address on the channel or "<unknown remote>" if none exists. */
- public static String getRemoteAddress(Channel channel) {
- if (channel != null && channel.remoteAddress() != null) {
- return channel.remoteAddress().toString();
- }
- return "<unknown remote>";
- }
-
- /**
- * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
- * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
- * but released by the executor thread rather than the event loop thread. Those thread-local
- * caches actually delay the recycling of buffers, leading to larger memory usage.
- */
- public static PooledByteBufAllocator createPooledByteBufAllocator(
- boolean allowDirectBufs,
- boolean allowCache,
- int numCores) {
- if (numCores == 0) {
- numCores = Runtime.getRuntime().availableProcessors();
- }
- return new PooledByteBufAllocator(
- allowDirectBufs && PlatformDependent.directBufferPreferred(),
- Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
- Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
- getPrivateStaticField("DEFAULT_PAGE_SIZE"),
- getPrivateStaticField("DEFAULT_MAX_ORDER"),
- allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
- allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
- allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
- );
- }
-
- /** Used to get defaults from Netty's private static fields. */
- private static int getPrivateStaticField(String name) {
- try {
- Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
- f.setAccessible(true);
- return f.getInt(null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
deleted file mode 100644
index 5f20b70..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.util.NoSuchElementException;
-
-import org.apache.spark.network.util.ConfigProvider;
-
-/** Uses System properties to obtain config values. */
-public class SystemPropertyConfigProvider extends ConfigProvider {
- @Override
- public String get(String name) {
- String value = System.getProperty(name);
- if (value == null) {
- throw new NoSuchElementException(name);
- }
- return value;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org