You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/09/18 00:02:49 UTC
svn commit: r998347 - in /avro/trunk: ./ doc/src/content/xdocs/
lang/java/src/java/org/apache/avro/ipc/
lang/java/src/test/java/org/apache/avro/ipc/
Author: cutting
Date: Fri Sep 17 22:02:48 2010
New Revision: 998347
URL: http://svn.apache.org/viewvc?rev=998347&view=rev
Log:
AVRO-641. Java: Add SASL security for socket-based RPC.
Added:
avro/trunk/doc/src/content/xdocs/sasl.xml
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java
avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/doc/src/content/xdocs/site.xml
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Sep 17 22:02:48 2010
@@ -6,6 +6,10 @@ Avro 1.5.0 (unreleased)
Avro 1.4.1 (unreleased)
+ NEW FEATURES
+
+ AVRO-641. Java: Add SASL security for socket-based RPC. (cutting)
+
IMPROVEMENTS
AVRO-655. Change build so that 'dist' target no longer also runs C
Added: avro/trunk/doc/src/content/xdocs/sasl.xml
URL: http://svn.apache.org/viewvc/avro/trunk/doc/src/content/xdocs/sasl.xml?rev=998347&view=auto
==============================================================================
--- avro/trunk/doc/src/content/xdocs/sasl.xml (added)
+++ avro/trunk/doc/src/content/xdocs/sasl.xml Fri Sep 17 22:02:48 2010
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+<document>
+ <header>
+ <title>Avro SASL profile</title>
+ </header>
+ <body>
+ <section id="intro">
+ <title>Introduction</title>
+ <p>SASL (<a href="http://www.ietf.org/rfc/rfc2222.txt">RFC 2222</a>)
+ provides a framework for authentication and security of network
+ protocols. Each protocol that uses SASL is meant to define a
+ SASL <em>profile</em>. This document provides a SASL profile
+ for connection-based Avro RPC.</p>
+ </section>
+
+ <section id="overview">
+ <title>Overview</title>
+ <p>SASL negotiation proceeds as a series of message interactions
+ over a connection between a client and server using a selected
+ SASL <em>mechanism</em>. The client starts this negotiation by
+ sending its chosen mechanism name with an initial (possibly
+ empty) message. Negotiation proceeds with the exchange of
+ messages until either side indicates success or failure. The
+ content of the messages is mechanism-specific. If the
+ negotiation succeeds, then the session can proceed over the
+ connection, otherwise it must be abandoned.</p>
+ <p>Some mechanisms continue to process session data after
+ negotiation (e.g., encrypting it), while some specify that
+ further session data is transmitted unmodifed.</p>
+ </section>
+
+ <section id="negotiation">
+ <title>Negotiation</title>
+ <section id="commands">
+ <title>Commands</title>
+ <p>Avro SASL negotiation uses four one-byte commands.</p>
+ <ul>
+ <li><code>0: START</code> Used in a client's initial message.</li>
+ <li><code>1: CONTINUE</code> Used while negotiation is ongoing.</li>
+ <li><code>2: FAIL</code> Terminates negotiation unsuccessfully.</li>
+ <li><code>3: COMPLETE</code> Terminates negotiation sucessfully.</li>
+ </ul>
+
+ <p>The format of a START message is:</p>
+ <source>| 0 | 4-byte mechanism name length | mechanism name | 4-byte payload length | payload data |</source>
+
+ <p>The format of a CONTINUE message is:</p>
+ <source>| 1 | 4-byte payload length | payload data |</source>
+
+ <p>The format of a FAIL message is:</p>
+ <source>| 2 | 4-byte message length | UTF-8 message |</source>
+
+ <p>The format of a COMPLETE message is:</p>
+ <source>| 3 | 4-byte payload length | payload data |</source>
+ </section>
+
+ <section id="process">
+ <title>Process</title>
+ <p>Negotiation is initiated by a client sending a START command
+ containing the client's chosen mechanism name and any
+ mechanism-specific payload data.</p>
+
+ <p>The server and client then interchange some number
+ (possibly zero) of CONTINUE messages. Each message contains
+ payload data that is processed by the security mechanism to
+ generate the next message.</p>
+
+ <p>Once either the client or server send a FAIL message then
+ negotiation has failed. UTF-8-encoded text is included in
+ the failure message. Once either a FAIL message has been
+ sent or recieved, or any other error occurs in the
+ negotiation, further communication on this connection must
+ cease.</p>
+
+ <p>Once either the client or server send a COMPLETE message
+ then negotiation has completed successfully. Session data
+ may now be transmitted over the connection until it is
+ closed by either side.</p>
+ </section>
+
+ </section>
+
+ <section id="session">
+ <title>Session Data</title>
+ <p>If no SASL QOP (quality of protection) is negotiated, then
+ all subsequent writes to/reads over this connection are
+ written/read unmodified. In particular, messages use
+ Avro <a href="spec.html#Message+Framing">framing</a>, and are
+ of the form:</p>
+ <source>| 4-byte frame length | frame data | ... | 4 zero bytes |</source>
+ <p>If a SASL QOP is negotiated, then it must be used by the
+ connection for all subsequent messages. This is done by
+ wrapping each non-empty frame written using the security
+ mechanism and unwrapping each non-empty frame read. The
+ length written in each non-empty frame is the length of the
+ wrapped data. Complete frames must be passed to the security
+ mechanism for unwrapping. Unwrapped data is then passed to
+ the application as the content of the frame.</p>
+ <p>If at any point processing fails due to wrapping, unwrapping
+ or framing errors, then all further communication on this
+ connection must cease.</p>
+ </section>
+
+ <section id="anonymous">
+ <title>Anonymous Mechanism</title>
+ <p>The SASL anonymous mechanism
+ (<a href="http://www.ietf.org/rfc/rfc2222.txt">RFC 2245</a>) is
+ quite simple to implement. In particular, an initial anonymous
+ request may be prefixed by the following static sequence:</p>
+ <source>| 0 | 0009 | ANONYMOUS | 0000 |</source>
+ <p>If a server uses the anonymous mechanism, it should check
+ that the mechanism name in the start message prefixing the first
+ request recieved is 'ANONYMOUS', then simply prefix its initial
+ response with a COMPLETE message of:</p>
+ <source>| 3 | 0000 |</source>
+ <p>If an anonymous server recieves some other mechanism name,
+ then it may respond with a FAIL message as simple as:</p>
+ <source>| 2 | 0000 |</source>
+ <p>Note that the anonymous mechanism need add no additional
+ round-trip messages between client and server. The START
+ message can be piggybacked on the initial request and the
+ COMPLETE or FAIL message can be piggybacked on the initial
+ response.</p>
+ </section>
+
+ </body>
+</document>
Modified: avro/trunk/doc/src/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/avro/trunk/doc/src/content/xdocs/site.xml?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/doc/src/content/xdocs/site.xml (original)
+++ avro/trunk/doc/src/content/xdocs/site.xml Fri Sep 17 22:02:48 2010
@@ -46,6 +46,7 @@ See http://forrest.apache.org/docs/linki
<c-api label="C API" href="ext:api/c/index" />
<cpp-api label="C++ API" href="ext:api/cpp/index" />
<idl label="IDL language" href="idl.html" />
+ <sasl label="SASL profile" href="sasl.html" />
<wiki label="Wiki" href="ext:wiki" />
<faq label="FAQ" href="ext:faq" />
</docs>
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.util.Map;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link Server} that uses {@link javax.security.sasl} for authentication
+ * and encryption. */
+public class SaslSocketServer extends SocketServer {
+ private static final Logger LOG = LoggerFactory.getLogger(SaslServer.class);
+
+ private static abstract class SaslServerFactory {
+ protected abstract SaslServer getServer() throws SaslException;
+ }
+
+ private SaslServerFactory factory;
+
+ /** Create using SASL's anonymous (<a
+ * href="http://www.ietf.org/rfc/rfc2245.txt">RFC 2245) mechanism. */
+ public SaslSocketServer(Responder responder, SocketAddress addr)
+ throws IOException {
+ this(responder, addr,
+ new SaslServerFactory() {
+ public SaslServer getServer() { return new AnonymousServer(); }
+ });
+ }
+
+ /** Create using the specified {@link SaslServer} parameters. */
+ public SaslSocketServer(Responder responder, SocketAddress addr,
+ final String mechanism, final String protocol,
+ final String serverName, final Map<String,?> props,
+ final CallbackHandler cbh) throws IOException {
+ this(responder, addr,
+ new SaslServerFactory() {
+ public SaslServer getServer() throws SaslException {
+ return Sasl.createSaslServer(mechanism, protocol, serverName,
+ props, cbh);
+ }
+ });
+ }
+
+ private SaslSocketServer(Responder responder, SocketAddress addr,
+ SaslServerFactory factory) throws IOException {
+ super(responder, addr);
+ this.factory = factory;
+ }
+
+ @Override protected Transceiver getTransceiver(SocketChannel channel)
+ throws IOException {
+ return new SaslSocketTransceiver(channel, factory.getServer());
+ }
+
+ private static class AnonymousServer implements SaslServer {
+ private String user;
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public byte[] evaluateResponse(byte[] response) throws SaslException {
+ try {
+ this.user = new String(response, "UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ return null;
+ }
+ public boolean isComplete() { return user != null; }
+ public String getAuthorizationID() { return user; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.avro.Protocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link Transceiver} that uses {@link javax.security.sasl} for
+ * authentication and encryption. */
+public class SaslSocketTransceiver extends Transceiver {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SaslSocketTransceiver.class);
+
+ private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+ private static enum Status { START, CONTINUE, FAIL, COMPLETE }
+
+ private SaslParticipant sasl;
+ private SocketChannel channel;
+ private boolean dataIsWrapped;
+ private boolean saslResponsePiggybacked;
+
+ private Protocol remote;
+
+ private ByteBuffer readHeader = ByteBuffer.allocate(4);
+ private ByteBuffer writeHeader = ByteBuffer.allocate(4);
+ private ByteBuffer zeroHeader = ByteBuffer.allocate(4).putInt(0);
+
+ /** Create using SASL's anonymous (<a
+ * href="http://www.ietf.org/rfc/rfc2245.txt">RFC 2245) mechanism. */
+ public SaslSocketTransceiver(SocketAddress address) throws IOException {
+ this(address, new AnonymousClient());
+ }
+
+ /** Create using the specified {@link SaslClient}. */
+ public SaslSocketTransceiver(SocketAddress address, SaslClient saslClient)
+ throws IOException {
+ this.sasl = new SaslParticipant(saslClient);
+ this.channel = SocketChannel.open(address);
+ this.channel.socket().setTcpNoDelay(true);
+ LOG.debug("open to {}", getRemoteName());
+ open(true);
+ }
+
+ /** Create using the specified {@link SaslServer}. */
+ public SaslSocketTransceiver(SocketChannel channel, SaslServer saslServer)
+ throws IOException {
+ this.sasl = new SaslParticipant(saslServer);
+ this.channel = channel;
+ LOG.debug("open from {}", getRemoteName());
+ open(false);
+ }
+
+ @Override public boolean isConnected() { return remote != null; }
+
+ @Override public void setRemote(Protocol remote) {
+ this.remote = remote;
+ }
+
+ @Override public Protocol getRemote() {
+ return remote;
+ }
+ @Override public String getRemoteName() {
+ return channel.socket().getRemoteSocketAddress().toString();
+ }
+
+ @Override
+ public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ if (saslResponsePiggybacked) { // still need to read response
+ saslResponsePiggybacked = false;
+ Status status = readStatus();
+ ByteBuffer frame = readFrame();
+ switch (status) {
+ case COMPLETE:
+ break;
+ case FAIL:
+ throw new SaslException("Fail: "+toString(frame));
+ default:
+ throw new IOException("Unexpected SASL status: "+status);
+ }
+ }
+ return super.transceive(request);
+ }
+
+ private void open(boolean isClient) throws IOException {
+ LOG.debug("beginning SASL negotiation");
+
+ if (isClient) {
+ ByteBuffer response = EMPTY;
+ if (sasl.client.hasInitialResponse())
+ response = ByteBuffer.wrap(sasl.evaluate(response.array()));
+ write(Status.START, sasl.getMechanismName(), response);
+ if (sasl.isComplete())
+ saslResponsePiggybacked = true;
+ }
+
+ while (!sasl.isComplete()) {
+ Status status = readStatus();
+ ByteBuffer frame = readFrame();
+ switch (status) {
+ case START:
+ String mechanism = toString(frame);
+ frame = readFrame();
+ if (!mechanism.equalsIgnoreCase(sasl.getMechanismName())) {
+ write(Status.FAIL, "Wrong mechanism: "+mechanism);
+ throw new SaslException("Wrong mechanism: "+mechanism);
+ }
+ case CONTINUE:
+ byte[] response;
+ try {
+ response = sasl.evaluate(frame.array());
+ status = sasl.isComplete() ? Status.COMPLETE : Status.CONTINUE;
+ } catch (SaslException e) {
+ response = e.toString().getBytes("UTF-8");
+ status = Status.FAIL;
+ }
+ write(status, response!=null ? ByteBuffer.wrap(response) : EMPTY);
+ break;
+ case COMPLETE:
+ sasl.evaluate(frame.array());
+ if (!sasl.isComplete())
+ throw new SaslException("Expected completion!");
+ break;
+ case FAIL:
+ throw new SaslException("Fail: "+toString(frame));
+ default:
+ throw new IOException("Unexpected SASL status: "+status);
+ }
+ }
+ LOG.debug("SASL opened");
+
+ String qop = (String) sasl.getNegotiatedProperty(Sasl.QOP);
+ LOG.debug("QOP = {}", qop);
+ dataIsWrapped = (qop != null && !qop.equalsIgnoreCase("auth"));
+ }
+
+ private String toString(ByteBuffer buffer) throws IOException {
+ try {
+ return new String(buffer.array(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IOException(e.toString(), e);
+ }
+ }
+
+ @Override public synchronized List<ByteBuffer> readBuffers()
+ throws IOException {
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ while (true) {
+ ByteBuffer buffer = readFrameAndUnwrap();
+ if (buffer.remaining() == 0)
+ return buffers;
+ buffers.add(buffer);
+ }
+ }
+
+ private Status readStatus() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(1);
+ read(buffer);
+ int status = buffer.get();
+ if (status > Status.values().length)
+ throw new IOException("Unexpected SASL status byte: "+status);
+ return Status.values()[status];
+ }
+
+ private ByteBuffer readFrameAndUnwrap() throws IOException {
+ ByteBuffer frame = readFrame();
+ if (!dataIsWrapped)
+ return frame;
+ ByteBuffer unwrapped = ByteBuffer.wrap(sasl.unwrap(frame.array()));
+ LOG.debug("unwrapped data of length: {}", unwrapped.remaining());
+ return unwrapped;
+ }
+
+ private ByteBuffer readFrame() throws IOException {
+ read(readHeader);
+ ByteBuffer buffer = ByteBuffer.allocate(readHeader.getInt());
+ LOG.debug("about to read: {} bytes", buffer.capacity());
+ read(buffer);
+ return buffer;
+ }
+
+ private void read(ByteBuffer buffer) throws IOException {
+ buffer.clear();
+ while (buffer.hasRemaining())
+ if (channel.read(buffer) == -1)
+ throw new EOFException();
+ buffer.flip();
+ }
+
+ @Override public synchronized void writeBuffers(List<ByteBuffer> buffers)
+ throws IOException {
+ if (buffers == null) return; // no data to write
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(buffers.size()*2+1);
+ int currentLength = 0;
+ ByteBuffer currentHeader = writeHeader;
+ for (ByteBuffer buffer : buffers) { // gather writes
+ if (buffer.remaining() == 0) continue; // ignore empties
+ if (dataIsWrapped) {
+ LOG.debug("wrapping data of length: {}", buffer.remaining());
+ buffer = ByteBuffer.wrap(sasl.wrap(buffer.array(), buffer.position(),
+ buffer.remaining()));
+ }
+ int length = buffer.remaining();
+ if (!dataIsWrapped // can append buffers on wire
+ && (currentLength + length) <= ByteBufferOutputStream.BUFFER_SIZE) {
+ if (currentLength == 0)
+ writes.add(currentHeader);
+ currentLength += length;
+ currentHeader.clear();
+ currentHeader.putInt(currentLength);
+ LOG.debug("adding {} to write, total now {}", length, currentLength);
+ } else {
+ currentLength = length;
+ currentHeader = ByteBuffer.allocate(4).putInt(length);
+ writes.add(currentHeader);
+ LOG.debug("planning write of {}", length);
+ }
+ currentHeader.flip();
+ writes.add(buffer);
+ }
+ zeroHeader.flip(); // zero-terminate
+ writes.add(zeroHeader);
+
+ channel.write(writes.toArray(new ByteBuffer[writes.size()]));
+ }
+
+ private void write(Status status, String prefix, ByteBuffer response)
+ throws IOException {
+ LOG.debug("write status: {} {}", status, prefix);
+ write(status, prefix);
+ write(response);
+ }
+
+ private void write(Status status, String response) throws IOException {
+ write(status, ByteBuffer.wrap(response.getBytes("UTF-8")));
+ }
+
+ private void write(Status status, ByteBuffer response) throws IOException {
+ LOG.debug("write status: {}", status);
+ ByteBuffer statusBuffer = ByteBuffer.allocate(1);
+ statusBuffer.clear();
+ statusBuffer.put((byte)(status.ordinal())).flip();
+ channel.write(statusBuffer);
+ write(response);
+ }
+
+ private void write(ByteBuffer response) throws IOException {
+ LOG.debug("writing: {}", response.remaining());
+ writeHeader.clear();
+ writeHeader.putInt(response.remaining()).flip();
+ channel.write(new ByteBuffer[] { writeHeader, response });
+ }
+
+ @Override public void close() throws IOException {
+ if (channel.isOpen()) {
+ LOG.info("closing to "+getRemoteName());
+ channel.close();
+ }
+ sasl.dispose();
+ }
+
+ /**
+ * Used to abstract over the <code>SaslServer</code> and
+ * <code>SaslClient</code> classes, which share a lot of their interface, but
+ * unfortunately don't share a common superclass.
+ */
+ private static class SaslParticipant {
+ // One of these will always be null.
+ public SaslServer server;
+ public SaslClient client;
+
+ public SaslParticipant(SaslServer server) {
+ this.server = server;
+ }
+
+ public SaslParticipant(SaslClient client) {
+ this.client = client;
+ }
+
+ public String getMechanismName() {
+ if (client != null)
+ return client.getMechanismName();
+ else
+ return server.getMechanismName();
+ }
+
+ public boolean isComplete() {
+ if (client != null)
+ return client.isComplete();
+ else
+ return server.isComplete();
+ }
+
+ public void dispose() throws SaslException {
+ if (client != null)
+ client.dispose();
+ else
+ server.dispose();
+ }
+
+ public byte[] unwrap(byte[] buf) throws SaslException {
+ if (client != null)
+ return client.unwrap(buf, 0, buf.length);
+ else
+ return server.unwrap(buf, 0, buf.length);
+ }
+
+ public byte[] wrap(byte[] buf) throws SaslException {
+ return wrap(buf, 0, buf.length);
+ }
+
+ public byte[] wrap(byte[] buf, int start, int len) throws SaslException {
+ if (client != null)
+ return client.wrap(buf, start, len);
+ else
+ return server.wrap(buf, start, len);
+ }
+
+ public Object getNegotiatedProperty(String propName) {
+ if (client != null)
+ return client.getNegotiatedProperty(propName);
+ else
+ return server.getNegotiatedProperty(propName);
+ }
+
+ public byte[] evaluate(byte[] buf) throws SaslException {
+ if (client != null)
+ return client.evaluateChallenge(buf);
+ else
+ return server.evaluateResponse(buf);
+ }
+
+ }
+
+ private static class AnonymousClient implements SaslClient {
+ public String getMechanismName() { return "ANONYMOUS"; }
+ public boolean hasInitialResponse() { return true; }
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+ try {
+ return System.getProperty("user.name").getBytes("UTF-8");
+ } catch (IOException e) {
+ throw new SaslException(e.toString());
+ }
+ }
+ public boolean isComplete() { return true; }
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getNegotiatedProperty(String propName) { return null; }
+ public void dispose() {}
+ }
+}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java Fri Sep 17 22:02:48 2010
@@ -19,6 +19,7 @@
package org.apache.avro.ipc;
import java.io.IOException;
+import java.io.EOFException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
@@ -55,16 +56,22 @@ public class SocketServer extends Thread
public void run() {
LOG.info("starting "+channel.socket().getInetAddress());
- while (true) {
+ try {
+ while (true) {
+ try {
+ new Connection(channel.accept());
+ } catch (ClosedChannelException e) {
+ return;
+ } catch (IOException e) {
+ LOG.warn("unexpected error", e);
+ throw new RuntimeException(e);
+ }
+ }
+ } finally {
+ LOG.info("stopping "+channel.socket().getInetAddress());
try {
- new Connection(channel.accept());
- } catch (ClosedChannelException e) {
- return;
+ channel.close();
} catch (IOException e) {
- LOG.warn("unexpected error", e);
- throw new RuntimeException(e);
- } finally {
- LOG.info("stopping "+channel.socket().getInetAddress());
}
}
}
@@ -73,10 +80,20 @@ public class SocketServer extends Thread
group.interrupt();
}
- private class Connection extends SocketTransceiver implements Runnable {
+ /** Creates an appropriate {@link Transceiver} for this server.
+ * Returns a {@link SocketTransceiver} by default. */
+ protected Transceiver getTransceiver(SocketChannel channel)
+ throws IOException {
+ return new SocketTransceiver(channel);
+ }
+
+ private class Connection implements Runnable {
+
+ SocketChannel channel;
+ Transceiver xc;
public Connection(SocketChannel channel) throws IOException {
- super(channel);
+ this.channel = channel;
Thread thread = new Thread(group, this);
thread.setName("Connection to "+channel.socket().getRemoteSocketAddress());
@@ -87,13 +104,16 @@ public class SocketServer extends Thread
public void run() {
try {
try {
+ this.xc = getTransceiver(channel);
while (true) {
- writeBuffers(responder.respond(readBuffers(), this));
+ xc.writeBuffers(responder.respond(xc.readBuffers(), xc));
}
+ } catch (EOFException e) {
+ return;
} catch (ClosedChannelException e) {
return;
} finally {
- close();
+ channel.close();
}
} catch (IOException e) {
LOG.warn("unexpected error", e);
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.TestProtocolGeneric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Before;
+
+public class TestSaslAnonymous extends TestProtocolGeneric {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSaslAnonymous.class);
+
+ @Before
+ public void testStartServer() throws Exception {
+ if (server != null) return;
+ server = new SaslSocketServer(new TestResponder(),new InetSocketAddress(0));
+ server.start();
+ client = new SaslSocketTransceiver(new InetSocketAddress(server.getPort()));
+ requestor = new GenericRequestor(PROTOCOL, client);
+ }
+
+ @Override public void testHandshake() throws IOException {}
+
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.net.InetSocketAddress;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.TestProtocolGeneric;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestSaslDigestMd5 extends TestProtocolGeneric {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSaslDigestMd5.class);
+
+ private static final String HOST = "localhost";
+ private static final String SERVICE = "avro-test";
+ private static final String PRINCIPAL = "avro-test-principal";
+ private static final String PASSWORD = "super secret password";
+ private static final String REALM = "avro-test-realm";
+
+ private static final String DIGEST_MD5_MECHANISM = "DIGEST-MD5";
+ private static final Map<String, String> DIGEST_MD5_PROPS =
+ new HashMap<String, String>();
+
+ static {
+ DIGEST_MD5_PROPS.put(Sasl.QOP, "auth-int");
+ DIGEST_MD5_PROPS.put("com.sun.security.sasl.digest.realm", REALM);
+ }
+
+ private static class TestSaslCallbackHandler implements CallbackHandler {
+ @Override
+ public void handle(Callback[] callbacks)
+ throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ ((NameCallback) c).setName(PRINCIPAL);
+ } else if (c instanceof PasswordCallback) {
+ ((PasswordCallback) c).setPassword(PASSWORD.toCharArray());
+ } else if (c instanceof AuthorizeCallback) {
+ ((AuthorizeCallback) c).setAuthorized(true);
+ } else if (c instanceof RealmCallback) {
+ ((RealmCallback) c).setText(REALM);
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+ }
+
+ @Before
+ public void testStartServer() throws Exception {
+ if (server != null) return;
+ server = new SaslSocketServer
+ (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+ SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+ server.start();
+ SaslClient saslClient = Sasl.createSaslClient
+ (new String[]{DIGEST_MD5_MECHANISM}, PRINCIPAL, SERVICE, HOST,
+ DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+ client = new SaslSocketTransceiver(new InetSocketAddress(server.getPort()),
+ saslClient);
+ requestor = new GenericRequestor(PROTOCOL, client);
+ }
+
+ @Test(expected=SaslException.class)
+ public void testAnonymousClient() throws Exception {
+ Server s = new SaslSocketServer
+ (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+ SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+ s.start();
+ Transceiver c =
+ new SaslSocketTransceiver(new InetSocketAddress(s.getPort()));
+ GenericRequestor requestor = new GenericRequestor(PROTOCOL, c);
+ GenericRecord params =
+ new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest());
+ params.put("greeting", "bob");
+ Utf8 response = (Utf8)requestor.request("hello", params);
+ assertEquals(new Utf8("goodbye"), response);
+ s.close();
+ c.close();
+ }
+
+
+ private static class WrongPasswordCallbackHandler implements CallbackHandler {
+ @Override
+ public void handle(Callback[] callbacks)
+ throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ ((NameCallback) c).setName(PRINCIPAL);
+ } else if (c instanceof PasswordCallback) {
+ ((PasswordCallback) c).setPassword("wrong".toCharArray());
+ } else if (c instanceof AuthorizeCallback) {
+ ((AuthorizeCallback) c).setAuthorized(true);
+ } else if (c instanceof RealmCallback) {
+ ((RealmCallback) c).setText(REALM);
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+ }
+
+ @Test(expected=SaslException.class)
+ public void testWrongPassword() throws Exception {
+ Server s = new SaslSocketServer
+ (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+ SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+ s.start();
+ SaslClient saslClient = Sasl.createSaslClient
+ (new String[]{DIGEST_MD5_MECHANISM}, PRINCIPAL, SERVICE, HOST,
+ DIGEST_MD5_PROPS, new WrongPasswordCallbackHandler());
+ Transceiver c = new SaslSocketTransceiver
+ (new InetSocketAddress(server.getPort()), saslClient);
+ GenericRequestor requestor = new GenericRequestor(PROTOCOL, c);
+ GenericRecord params =
+ new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest());
+ params.put("greeting", "bob");
+ Utf8 response = (Utf8)requestor.request("hello", params);
+ assertEquals(new Utf8("goodbye"), response);
+ s.close();
+ c.close();
+ }
+
+ @Override public void testHandshake() throws IOException {}
+
+}