You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/09/18 00:02:49 UTC

svn commit: r998347 - in /avro/trunk: ./ doc/src/content/xdocs/ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/test/java/org/apache/avro/ipc/

Author: cutting
Date: Fri Sep 17 22:02:48 2010
New Revision: 998347

URL: http://svn.apache.org/viewvc?rev=998347&view=rev
Log:
AVRO-641. Java: Add SASL security for socket-based RPC.

Added:
    avro/trunk/doc/src/content/xdocs/sasl.xml
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/doc/src/content/xdocs/site.xml
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Sep 17 22:02:48 2010
@@ -6,6 +6,10 @@ Avro 1.5.0 (unreleased)
 
 Avro 1.4.1 (unreleased)
 
+  NEW FEATURES
+
+    AVRO-641. Java: Add SASL security for socket-based RPC. (cutting)
+
   IMPROVEMENTS
 
     AVRO-655. Change build so that 'dist' target no longer also runs C

Added: avro/trunk/doc/src/content/xdocs/sasl.xml
URL: http://svn.apache.org/viewvc/avro/trunk/doc/src/content/xdocs/sasl.xml?rev=998347&view=auto
==============================================================================
--- avro/trunk/doc/src/content/xdocs/sasl.xml (added)
+++ avro/trunk/doc/src/content/xdocs/sasl.xml Fri Sep 17 22:02:48 2010
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+<document>
+  <header>
+    <title>Avro SASL profile</title>
+  </header>
+  <body>
+    <section id="intro">
+      <title>Introduction</title>
+      <p>SASL (<a href="http://www.ietf.org/rfc/rfc2222.txt">RFC 2222</a>)
+      provides a framework for authentication and security of network
+      protocols.  Each protocol that uses SASL is meant to define a
+      SASL <em>profile</em>.  This document provides a SASL profile
+      for connection-based Avro RPC.</p>
+    </section>
+
+    <section id="overview">
+      <title>Overview</title>
+      <p>SASL negotiation proceeds as a series of message interactions
+      over a connection between a client and server using a selected
+      SASL <em>mechanism</em>.  The client starts this negotiation by
+      sending its chosen mechanism name with an initial (possibly
+      empty) message.  Negotiation proceeds with the exchange of
+      messages until either side indicates success or failure.  The
+      content of the messages is mechanism-specific.  If the
+      negotiation succeeds, then the session can proceed over the
+      connection, otherwise it must be abandoned.</p>
+      <p>Some mechanisms continue to process session data after
+      negotiation (e.g., encrypting it), while some specify that
+      further session data is transmitted unmodifed.</p>
+    </section>
+
+    <section id="negotiation">
+      <title>Negotiation</title>
+      <section id="commands">
+	<title>Commands</title>
+	<p>Avro SASL negotiation uses four one-byte commands.</p>
+	<ul>
+	  <li><code>0: START</code>  Used in a client's initial message.</li>
+	  <li><code>1: CONTINUE</code> Used while negotiation is ongoing.</li>
+	  <li><code>2: FAIL</code> Terminates negotiation unsuccessfully.</li>
+	  <li><code>3: COMPLETE</code> Terminates negotiation sucessfully.</li>
+	</ul>
+	
+	<p>The format of a START message is:</p>
+	<source>| 0 | 4-byte mechanism name length | mechanism name | 4-byte payload length | payload data |</source>
+	
+	<p>The format of a CONTINUE message is:</p>
+	<source>| 1 | 4-byte payload length | payload data |</source>
+	
+	<p>The format of a FAIL message is:</p>
+	<source>| 2 | 4-byte message length | UTF-8 message |</source>
+	
+	<p>The format of a COMPLETE message is:</p>
+	<source>| 3 | 4-byte payload length | payload data |</source>
+      </section>
+
+      <section id="process">
+	<title>Process</title>
+	<p>Negotiation is initiated by a client sending a START command
+	  containing the client's chosen mechanism name and any
+	  mechanism-specific payload data.</p>
+	
+	<p>The server and client then interchange some number
+	  (possibly zero) of CONTINUE messages.  Each message contains
+	  payload data that is processed by the security mechanism to
+	  generate the next message.</p>
+	
+	<p>Once either the client or server send a FAIL message then
+	  negotiation has failed.  UTF-8-encoded text is included in
+	  the failure message.  Once either a FAIL message has been
+	  sent or recieved, or any other error occurs in the
+	  negotiation, further communication on this connection must
+	  cease.</p>
+	
+	<p>Once either the client or server send a COMPLETE message
+	  then negotiation has completed successfully.  Session data
+	  may now be transmitted over the connection until it is
+	  closed by either side.</p>
+      </section>
+
+    </section>
+
+    <section id="session">
+      <title>Session Data</title>
+      <p>If no SASL QOP (quality of protection) is negotiated, then
+	all subsequent writes to/reads over this connection are
+	written/read unmodified.  In particular, messages use
+	Avro <a href="spec.html#Message+Framing">framing</a>, and are
+	of the form:</p>
+      <source>| 4-byte frame length | frame data | ... | 4 zero bytes |</source>
+      <p>If a SASL QOP is negotiated, then it must be used by the
+	connection for all subsequent messages. This is done by
+	wrapping each non-empty frame written using the security
+	mechanism and unwrapping each non-empty frame read.  The
+	length written in each non-empty frame is the length of the
+	wrapped data. Complete frames must be passed to the security
+	mechanism for unwrapping.  Unwrapped data is then passed to
+	the application as the content of the frame.</p>
+      <p>If at any point processing fails due to wrapping, unwrapping
+	or framing errors, then all further communication on this
+	connection must cease.</p>
+    </section>
+
+    <section id="anonymous">
+      <title>Anonymous Mechanism</title>
+      <p>The SASL anonymous mechanism
+      (<a href="http://www.ietf.org/rfc/rfc2222.txt">RFC 2245</a>) is
+      quite simple to implement.  In particular, an initial anonymous
+      request may be prefixed by the following static sequence:</p>
+      <source>| 0 | 0009 | ANONYMOUS | 0000 |</source>
+      <p>If a server uses the anonymous mechanism, it should check
+      that the mechanism name in the start message prefixing the first
+      request recieved is 'ANONYMOUS', then simply prefix its initial
+      response with a COMPLETE message of:</p>
+      <source>| 3 | 0000 |</source>
+      <p>If an anonymous server recieves some other mechanism name,
+      then it may respond with a FAIL message as simple as:</p>
+      <source>| 2 | 0000 |</source>
+      <p>Note that the anonymous mechanism need add no additional
+      round-trip messages between client and server.  The START
+      message can be piggybacked on the initial request and the
+      COMPLETE or FAIL message can be piggybacked on the initial
+      response.</p>
+    </section>
+
+  </body>
+</document>

Modified: avro/trunk/doc/src/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/avro/trunk/doc/src/content/xdocs/site.xml?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/doc/src/content/xdocs/site.xml (original)
+++ avro/trunk/doc/src/content/xdocs/site.xml Fri Sep 17 22:02:48 2010
@@ -46,6 +46,7 @@ See http://forrest.apache.org/docs/linki
     <c-api      label="C API"             href="ext:api/c/index" />
     <cpp-api    label="C++ API"           href="ext:api/cpp/index" />
     <idl        label="IDL language"      href="idl.html" />
+    <sasl       label="SASL profile"      href="sasl.html" />
     <wiki       label="Wiki"              href="ext:wiki" />
     <faq        label="FAQ"               href="ext:faq" />
   </docs>

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketServer.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.util.Map;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link Server} that uses {@link javax.security.sasl} for authentication
+ * and encryption. */
+public class SaslSocketServer extends SocketServer {
+  private static final Logger LOG = LoggerFactory.getLogger(SaslServer.class);
+
+  private static abstract class SaslServerFactory {
+    protected abstract SaslServer getServer() throws SaslException;
+  }
+
+  private SaslServerFactory factory;
+
+  /** Create using SASL's anonymous (<a
+   * href="http://www.ietf.org/rfc/rfc2245.txt">RFC 2245) mechanism. */
+  public SaslSocketServer(Responder responder, SocketAddress addr)
+    throws IOException {
+    this(responder, addr,
+         new SaslServerFactory() {
+           public SaslServer getServer() { return new AnonymousServer(); }
+         });
+  }
+
+  /** Create using the specified {@link SaslServer} parameters. */
+  public SaslSocketServer(Responder responder, SocketAddress addr,
+                          final String mechanism, final String protocol,
+                          final String serverName, final Map<String,?> props,
+                          final CallbackHandler cbh) throws IOException {
+    this(responder, addr,
+         new SaslServerFactory() {
+           public SaslServer getServer() throws SaslException {
+             return Sasl.createSaslServer(mechanism, protocol, serverName,
+                                          props, cbh);
+           }
+         });
+  }
+
+  private SaslSocketServer(Responder responder, SocketAddress addr,
+                           SaslServerFactory factory) throws IOException {
+    super(responder, addr);
+    this.factory = factory;
+  }
+
+  @Override protected Transceiver getTransceiver(SocketChannel channel)
+    throws IOException {
+    return new SaslSocketTransceiver(channel, factory.getServer());
+  }
+
+  private static class AnonymousServer implements SaslServer {
+    private String user;
+    public String getMechanismName() { return "ANONYMOUS"; }
+    public byte[] evaluateResponse(byte[] response) throws SaslException {
+      try {
+        this.user = new String(response, "UTF-8");
+      } catch (IOException e) {
+        throw new SaslException(e.toString());
+      }
+      return null;
+    }
+    public boolean isComplete() { return user != null; }
+    public String getAuthorizationID() { return user; }
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
+      throw new UnsupportedOperationException();
+    }
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getNegotiatedProperty(String propName) { return null; }
+    public void dispose() {}
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SaslSocketTransceiver.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.avro.Protocol;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link Transceiver} that uses {@link javax.security.sasl} for
+ * authentication and encryption. */
+public class SaslSocketTransceiver extends Transceiver {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(SaslSocketTransceiver.class);
+
+  private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+  private static enum Status { START, CONTINUE, FAIL, COMPLETE }
+
+  private SaslParticipant sasl;
+  private SocketChannel channel;
+  private boolean dataIsWrapped;
+  private boolean saslResponsePiggybacked;
+
+  private Protocol remote;
+  
+  private ByteBuffer readHeader = ByteBuffer.allocate(4);
+  private ByteBuffer writeHeader = ByteBuffer.allocate(4);
+  private ByteBuffer zeroHeader = ByteBuffer.allocate(4).putInt(0);
+
+  /** Create using SASL's anonymous (<a
+   * href="http://www.ietf.org/rfc/rfc2245.txt">RFC 2245) mechanism. */
+  public SaslSocketTransceiver(SocketAddress address) throws IOException {
+    this(address, new AnonymousClient());
+  }
+
+  /** Create using the specified {@link SaslClient}. */
+  public SaslSocketTransceiver(SocketAddress address, SaslClient saslClient)
+    throws IOException {
+    this.sasl = new SaslParticipant(saslClient);
+    this.channel = SocketChannel.open(address);
+    this.channel.socket().setTcpNoDelay(true);
+    LOG.debug("open to {}", getRemoteName());
+    open(true);
+  }
+
+  /** Create using the specified {@link SaslServer}. */
+  public SaslSocketTransceiver(SocketChannel channel, SaslServer saslServer)
+    throws IOException {
+    this.sasl = new SaslParticipant(saslServer);
+    this.channel = channel;
+    LOG.debug("open from {}", getRemoteName());
+    open(false);
+  }
+
+  @Override public boolean isConnected() { return remote != null; }
+
+  @Override public void setRemote(Protocol remote) {
+    this.remote = remote;
+  }
+
+  @Override public Protocol getRemote() {
+    return remote;
+  }
+  @Override public String getRemoteName() {
+    return channel.socket().getRemoteSocketAddress().toString();
+  }
+
+  @Override
+  public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+    throws IOException {
+    if (saslResponsePiggybacked) {                // still need to read response
+      saslResponsePiggybacked = false;
+      Status status  = readStatus();
+      ByteBuffer frame = readFrame();
+      switch (status) {
+      case COMPLETE:
+        break;
+      case FAIL:
+        throw new SaslException("Fail: "+toString(frame));
+      default:
+        throw new IOException("Unexpected SASL status: "+status);
+      }
+    }
+    return super.transceive(request);
+  }
+
+  private void open(boolean isClient) throws IOException {
+    LOG.debug("beginning SASL negotiation");
+
+    if (isClient) {
+      ByteBuffer response = EMPTY;
+      if (sasl.client.hasInitialResponse())
+        response = ByteBuffer.wrap(sasl.evaluate(response.array()));
+      write(Status.START, sasl.getMechanismName(), response);
+      if (sasl.isComplete())
+        saslResponsePiggybacked = true;
+    }
+    
+    while (!sasl.isComplete()) {
+      Status status  = readStatus();
+      ByteBuffer frame = readFrame();
+      switch (status) {
+      case START:
+        String mechanism = toString(frame);
+        frame = readFrame();
+        if (!mechanism.equalsIgnoreCase(sasl.getMechanismName())) {
+          write(Status.FAIL, "Wrong mechanism: "+mechanism);
+          throw new SaslException("Wrong mechanism: "+mechanism);
+        }
+      case CONTINUE: 
+        byte[] response;
+        try {
+          response = sasl.evaluate(frame.array());
+          status = sasl.isComplete() ? Status.COMPLETE : Status.CONTINUE;
+        } catch (SaslException e) {
+          response = e.toString().getBytes("UTF-8");
+          status = Status.FAIL;
+        }
+        write(status, response!=null ? ByteBuffer.wrap(response) : EMPTY);
+        break;
+      case COMPLETE:
+        sasl.evaluate(frame.array());
+        if (!sasl.isComplete())
+          throw new SaslException("Expected completion!");
+        break;
+      case FAIL:
+        throw new SaslException("Fail: "+toString(frame));
+      default:
+        throw new IOException("Unexpected SASL status: "+status);
+      }
+    }
+    LOG.debug("SASL opened");
+
+    String qop = (String) sasl.getNegotiatedProperty(Sasl.QOP);
+    LOG.debug("QOP = {}", qop);
+    dataIsWrapped = (qop != null && !qop.equalsIgnoreCase("auth"));
+  }
+
+  private String toString(ByteBuffer buffer) throws IOException {
+    try {
+      return new String(buffer.array(), "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new IOException(e.toString(), e);
+    }
+  }
+
+  @Override public synchronized List<ByteBuffer> readBuffers()
+    throws IOException {
+    List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    while (true) {
+      ByteBuffer buffer = readFrameAndUnwrap();
+      if (buffer.remaining() == 0)
+        return buffers;
+      buffers.add(buffer);
+    }
+  }
+
+  private Status readStatus() throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    read(buffer);
+    int status = buffer.get();
+    if (status > Status.values().length)
+      throw new IOException("Unexpected SASL status byte: "+status);
+    return Status.values()[status];
+  }
+
+  private ByteBuffer readFrameAndUnwrap() throws IOException {
+    ByteBuffer frame = readFrame();
+    if (!dataIsWrapped)
+      return frame;
+    ByteBuffer unwrapped = ByteBuffer.wrap(sasl.unwrap(frame.array()));
+    LOG.debug("unwrapped data of length: {}", unwrapped.remaining());
+    return unwrapped;
+  }
+
+  private ByteBuffer readFrame() throws IOException {
+    read(readHeader);
+    ByteBuffer buffer = ByteBuffer.allocate(readHeader.getInt());
+    LOG.debug("about to read: {} bytes", buffer.capacity());
+    read(buffer);
+    return buffer;
+  }
+  
+  private void read(ByteBuffer buffer) throws IOException {
+    buffer.clear();
+    while (buffer.hasRemaining())
+      if (channel.read(buffer) == -1)
+        throw new EOFException();
+    buffer.flip();
+  }
+
+  @Override public synchronized void writeBuffers(List<ByteBuffer> buffers)
+    throws IOException {
+    if (buffers == null) return;                  // no data to write
+    List<ByteBuffer> writes = new ArrayList<ByteBuffer>(buffers.size()*2+1);
+    int currentLength = 0;
+    ByteBuffer currentHeader = writeHeader;
+    for (ByteBuffer buffer : buffers) {           // gather writes
+      if (buffer.remaining() == 0) continue;      // ignore empties
+      if (dataIsWrapped) {
+        LOG.debug("wrapping data of length: {}", buffer.remaining());
+        buffer = ByteBuffer.wrap(sasl.wrap(buffer.array(), buffer.position(),
+                                           buffer.remaining()));
+      }
+      int length = buffer.remaining();
+      if (!dataIsWrapped                          // can append buffers on wire
+          && (currentLength + length) <= ByteBufferOutputStream.BUFFER_SIZE) {
+        if (currentLength == 0)
+          writes.add(currentHeader);
+        currentLength += length;
+        currentHeader.clear();
+        currentHeader.putInt(currentLength);
+        LOG.debug("adding {} to write, total now {}", length, currentLength);
+      } else {
+        currentLength = length;
+        currentHeader = ByteBuffer.allocate(4).putInt(length);
+        writes.add(currentHeader);
+        LOG.debug("planning write of {}", length);
+      }
+      currentHeader.flip();
+      writes.add(buffer);
+    }
+    zeroHeader.flip();                            // zero-terminate
+    writes.add(zeroHeader);
+
+    channel.write(writes.toArray(new ByteBuffer[writes.size()]));
+  }
+
+  private void write(Status status, String prefix, ByteBuffer response)
+    throws IOException {
+    LOG.debug("write status: {} {}", status, prefix);
+    write(status, prefix);
+    write(response);
+  }
+
+  private void write(Status status, String response) throws IOException {
+    write(status, ByteBuffer.wrap(response.getBytes("UTF-8")));
+  }
+
+  private void write(Status status, ByteBuffer response) throws IOException {
+    LOG.debug("write status: {}", status);
+    ByteBuffer statusBuffer = ByteBuffer.allocate(1);
+    statusBuffer.clear();
+    statusBuffer.put((byte)(status.ordinal())).flip();
+    channel.write(statusBuffer);
+    write(response);
+  }
+
+  private void write(ByteBuffer response) throws IOException {
+    LOG.debug("writing: {}", response.remaining());
+    writeHeader.clear();
+    writeHeader.putInt(response.remaining()).flip();
+    channel.write(new ByteBuffer[] { writeHeader, response });
+  }
+
+  @Override public void close() throws IOException {
+    if (channel.isOpen()) {
+      LOG.info("closing to "+getRemoteName());
+      channel.close();
+    }
+    sasl.dispose();
+  }
+
+  /**
+   * Used to abstract over the <code>SaslServer</code> and
+   * <code>SaslClient</code> classes, which share a lot of their interface, but
+   * unfortunately don't share a common superclass.
+   */
+  private static class SaslParticipant {
+    // One of these will always be null.
+    public SaslServer server;
+    public SaslClient client;
+
+    public SaslParticipant(SaslServer server) {
+      this.server = server;
+    }
+
+    public SaslParticipant(SaslClient client) {
+      this.client = client;
+    }
+
+    public String getMechanismName() {
+      if (client != null)
+        return client.getMechanismName();
+      else
+        return server.getMechanismName();
+    }
+
+    public boolean isComplete() {
+      if (client != null)
+        return client.isComplete();
+      else
+        return server.isComplete();
+    }
+
+    public void dispose() throws SaslException {
+      if (client != null)
+        client.dispose();
+      else
+        server.dispose();
+    }
+
+    public byte[] unwrap(byte[] buf) throws SaslException {
+      if (client != null)
+        return client.unwrap(buf, 0, buf.length);
+      else
+        return server.unwrap(buf, 0, buf.length);
+    }
+
+    public byte[] wrap(byte[] buf) throws SaslException {
+      return wrap(buf, 0, buf.length);
+    }
+
+    public byte[] wrap(byte[] buf, int start, int len) throws SaslException {
+      if (client != null)
+        return client.wrap(buf, start, len);
+      else
+        return server.wrap(buf, start, len);
+    }
+
+    public Object getNegotiatedProperty(String propName) {
+      if (client != null)
+        return client.getNegotiatedProperty(propName);
+      else
+        return server.getNegotiatedProperty(propName);
+    }
+
+    public byte[] evaluate(byte[] buf) throws SaslException {
+      if (client != null)
+        return client.evaluateChallenge(buf);
+      else
+        return server.evaluateResponse(buf);
+    }
+
+  }
+
+  private static class AnonymousClient implements SaslClient {
+    public String getMechanismName() { return "ANONYMOUS"; }
+    public boolean hasInitialResponse() { return true; }
+    public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
+      try {
+        return System.getProperty("user.name").getBytes("UTF-8");
+      } catch (IOException e) {
+        throw new SaslException(e.toString());
+      }
+    }
+    public boolean isComplete() { return true; }
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
+      throw new UnsupportedOperationException();
+    }
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getNegotiatedProperty(String propName) { return null; }
+    public void dispose() {}
+  }
+}

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java?rev=998347&r1=998346&r2=998347&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java Fri Sep 17 22:02:48 2010
@@ -19,6 +19,7 @@
 package org.apache.avro.ipc;
 
 import java.io.IOException;
+import java.io.EOFException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -55,16 +56,22 @@ public class SocketServer extends Thread
 
   public void run() {
     LOG.info("starting "+channel.socket().getInetAddress());
-    while (true) {
+    try {
+      while (true) {
+        try {
+          new Connection(channel.accept());
+        } catch (ClosedChannelException e) {
+          return;
+        } catch (IOException e) {
+          LOG.warn("unexpected error", e);
+          throw new RuntimeException(e);
+        }
+      }
+    } finally {
+      LOG.info("stopping "+channel.socket().getInetAddress());
       try {
-        new Connection(channel.accept());
-      } catch (ClosedChannelException e) {
-        return;
+        channel.close();
       } catch (IOException e) {
-        LOG.warn("unexpected error", e);
-        throw new RuntimeException(e);
-      } finally {
-        LOG.info("stopping "+channel.socket().getInetAddress());
       }
     }
   }
@@ -73,10 +80,20 @@ public class SocketServer extends Thread
     group.interrupt();
   }
 
-  private class Connection extends SocketTransceiver implements Runnable {
+  /** Creates an appropriate {@link Transceiver} for this server.
+   * Returns a {@link SocketTransceiver} by default. */
+  protected Transceiver getTransceiver(SocketChannel channel)
+    throws IOException {
+    return new SocketTransceiver(channel);
+  }
+
+  private class Connection implements Runnable {
+
+    SocketChannel channel;
+    Transceiver xc;
 
     public Connection(SocketChannel channel) throws IOException {
-      super(channel);
+      this.channel = channel;
 
       Thread thread = new Thread(group, this);
       thread.setName("Connection to "+channel.socket().getRemoteSocketAddress());
@@ -87,13 +104,16 @@ public class SocketServer extends Thread
     public void run() {
       try {
         try {
+          this.xc = getTransceiver(channel);
           while (true) {
-            writeBuffers(responder.respond(readBuffers(), this));
+            xc.writeBuffers(responder.respond(xc.readBuffers(), xc));
           }
+        } catch (EOFException e) {
+          return;
         } catch (ClosedChannelException e) {
           return;
         } finally {
-          close();
+          channel.close();
         }
       } catch (IOException e) {
         LOG.warn("unexpected error", e);

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslAnonymous.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.TestProtocolGeneric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Before;
+
+public class TestSaslAnonymous extends TestProtocolGeneric {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestSaslAnonymous.class);
+
+  @Before
+  public void testStartServer() throws Exception {
+    if (server != null) return;
+    server = new SaslSocketServer(new TestResponder(),new InetSocketAddress(0));
+    server.start();
+    client = new SaslSocketTransceiver(new InetSocketAddress(server.getPort()));
+    requestor = new GenericRequestor(PROTOCOL, client);
+  }
+
+  @Override public void testHandshake() throws IOException {}
+
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java?rev=998347&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/ipc/TestSaslDigestMd5.java Fri Sep 17 22:02:48 2010
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.net.InetSocketAddress;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.TestProtocolGeneric;
+import org.apache.avro.util.Utf8;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class TestSaslDigestMd5 extends TestProtocolGeneric {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestSaslDigestMd5.class);
+
+  private static final String HOST = "localhost";
+  private static final String SERVICE = "avro-test";
+  private static final String PRINCIPAL = "avro-test-principal";
+  private static final String PASSWORD = "super secret password";
+  private static final String REALM = "avro-test-realm";
+
+  private static final String DIGEST_MD5_MECHANISM = "DIGEST-MD5";
+  private static final Map<String, String> DIGEST_MD5_PROPS =
+    new HashMap<String, String>();
+
+  static {
+    DIGEST_MD5_PROPS.put(Sasl.QOP, "auth-int");
+    DIGEST_MD5_PROPS.put("com.sun.security.sasl.digest.realm", REALM);
+  }
+
+  private static class TestSaslCallbackHandler implements CallbackHandler {
+    @Override
+    public void handle(Callback[] callbacks)
+      throws IOException, UnsupportedCallbackException {
+      for (Callback c : callbacks) {
+        if (c instanceof NameCallback) {
+          ((NameCallback) c).setName(PRINCIPAL);
+        } else if (c instanceof PasswordCallback) {
+          ((PasswordCallback) c).setPassword(PASSWORD.toCharArray());
+        } else if (c instanceof AuthorizeCallback) {
+          ((AuthorizeCallback) c).setAuthorized(true);
+        } else if (c instanceof RealmCallback) {
+          ((RealmCallback) c).setText(REALM);
+        } else {
+          throw new UnsupportedCallbackException(c);
+        }
+      }
+    }
+  }
+
+  @Before
+  public void testStartServer() throws Exception {
+    if (server != null) return;
+    server = new SaslSocketServer
+      (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+       SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+    server.start();
+    SaslClient saslClient = Sasl.createSaslClient
+      (new String[]{DIGEST_MD5_MECHANISM}, PRINCIPAL, SERVICE, HOST,
+       DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+    client = new SaslSocketTransceiver(new InetSocketAddress(server.getPort()),
+                                       saslClient);
+    requestor = new GenericRequestor(PROTOCOL, client);
+  }
+
+  @Test(expected=SaslException.class)
+  public void testAnonymousClient() throws Exception {
+    Server s = new SaslSocketServer
+      (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+       SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+    s.start();
+    Transceiver c =
+      new SaslSocketTransceiver(new InetSocketAddress(s.getPort()));
+    GenericRequestor requestor = new GenericRequestor(PROTOCOL, c);
+    GenericRecord params = 
+      new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest());
+    params.put("greeting", "bob");
+    Utf8 response = (Utf8)requestor.request("hello", params);
+    assertEquals(new Utf8("goodbye"), response);
+    s.close();
+    c.close();
+  }
+
+
+  private static class WrongPasswordCallbackHandler implements CallbackHandler {
+    @Override
+    public void handle(Callback[] callbacks)
+      throws IOException, UnsupportedCallbackException {
+      for (Callback c : callbacks) {
+        if (c instanceof NameCallback) {
+          ((NameCallback) c).setName(PRINCIPAL);
+        } else if (c instanceof PasswordCallback) {
+          ((PasswordCallback) c).setPassword("wrong".toCharArray());
+        } else if (c instanceof AuthorizeCallback) {
+          ((AuthorizeCallback) c).setAuthorized(true);
+        } else if (c instanceof RealmCallback) {
+          ((RealmCallback) c).setText(REALM);
+        } else {
+          throw new UnsupportedCallbackException(c);
+        }
+      }
+    }
+  }
+
+  @Test(expected=SaslException.class)
+  public void testWrongPassword() throws Exception {
+    Server s = new SaslSocketServer
+      (new TestResponder(), new InetSocketAddress(0), DIGEST_MD5_MECHANISM,
+       SERVICE, HOST, DIGEST_MD5_PROPS, new TestSaslCallbackHandler());
+    s.start();
+    SaslClient saslClient = Sasl.createSaslClient
+      (new String[]{DIGEST_MD5_MECHANISM}, PRINCIPAL, SERVICE, HOST,
+       DIGEST_MD5_PROPS, new WrongPasswordCallbackHandler());
+    Transceiver c = new SaslSocketTransceiver
+      (new InetSocketAddress(server.getPort()), saslClient);
+    GenericRequestor requestor = new GenericRequestor(PROTOCOL, c);
+    GenericRecord params = 
+      new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest());
+    params.put("greeting", "bob");
+    Utf8 response = (Utf8)requestor.request("hello", params);
+    assertEquals(new Utf8("goodbye"), response);
+    s.close();
+    c.close();
+  }
+
+  @Override public void testHandshake() throws IOException {}
+
+}